[GitHub] spark issue #23262: [SPARK-26312][SQL]Replace RDDConversions.rowToRowRdd wit...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23262 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22764: [SPARK-25765][ML] Add training cost to BisectingKMeans s...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/22764 kindly ping @dbtsai --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23057: [SPARK-26078][SQL] Dedup self-join attributes on IN subq...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23057 @cloud-fan @gatorsmile may you please take a look at this? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23259: [SPARK-26215][SQL][WIP] Define reserved/non-reserved key...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23259 +1 for SQL 2011. I downloaded the standard but I couldn't find any section dedicated to, In postgres doc, though, they are stating that they are not following the standard strictly: https://www.postgresql.org/docs/11/sql-keywords-appendix.html. Shall we follow that list and follow the standard as it is mentioned there? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23258: [SPARK-23375][SQL][FOLLOWUP][TEST] Test Sort metr...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23258#discussion_r240146706 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -182,10 +182,13 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared } test("Sort metrics") { -// Assume the execution plan is -// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1)) -val ds = spark.range(10).sort('id) -testSparkPlanMetrics(ds.toDF(), 2, Map.empty) +// Assume the execution plan with node id is +// Sort(nodeId = 0) +// Exchange(nodeId = 1) +// LocalTableScan(nodeId = 2) +val df = Seq(1, 3, 2).toDF("id").sort('id) +testSparkPlanMetrics(df, 2, Map.empty) --- End diff -- +1 for @cloud-fan suggestion. I mean, if we cannot check their exact value, we should at least check that they exist/have reasonable values. Otherwise this UT is useless. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23262#discussion_r240134191 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -17,51 +17,39 @@ package org.apache.spark.sql.execution +import scala.reflect.runtime.universe.TypeTag + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Encoder, Row, SparkSession} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.StructType object RDDConversions { - def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = { + def productToRowRdd[A <: Product : TypeTag](data: RDD[A], + outputSchema: StructType): RDD[InternalRow] = { --- End diff -- nit: indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23262#discussion_r240135694 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -17,51 +17,39 @@ package org.apache.spark.sql.execution +import scala.reflect.runtime.universe.TypeTag + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Encoder, Row, SparkSession} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.StructType object RDDConversions { - def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = { + def productToRowRdd[A <: Product : TypeTag](data: RDD[A], + outputSchema: StructType): RDD[InternalRow] = { --- End diff -- well, seems like this is never used actually... shall we remove it instead if this is the case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22957: [SPARK-25951][SQL] Ignore aliases for distributions and ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/22957 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23258: [SPARK-23375][SQL][FOLLOWUP][TEST] Test Sort metr...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23258#discussion_r240003036 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -182,10 +182,13 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared } test("Sort metrics") { -// Assume the execution plan is -// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1)) -val ds = spark.range(10).sort('id) -testSparkPlanMetrics(ds.toDF(), 2, Map.empty) +// Assume the execution plan with node id is +// Sort(nodeId = 0) +// Exchange(nodeId = 1) +// LocalTableScan(nodeId = 2) +val df = Seq(1, 3, 2).toDF("id").sort('id) +testSparkPlanMetrics(df, 2, Map.empty) --- End diff -- Thanks for pinging me @maropu. What is the point about checking that `LocalTableScan` contains no metrics? I checked the original PR which introduced this UT by @sameeragarwal who can maybe help us stating the goal of the test here (unless someone else can answer me, because I have not understood it). It doesn't seem even related to the Sort operator to me. Maybe I am missing something. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22957: [SPARK-25951][SQL] Ignore aliases for distributions and ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/22957 @cloud-fan @gatorsmile I updated the PR according to the previous suggestions and added a new dedicated test suite. May you please review this again? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed confi...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23213 @maropu I'd say so, but I am still not sure what (if there is one) is the difference between `wholeStage=false,sactoryMode=NO_CODEGEN` and `wholeStage=true,factoryMode=NO_CODEGEN`. `wholeStage=true,factoryMode=NO_CODEGEN` doesn't make much sense IMHO. Could you please check what that runs? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23233: [SPARK-26233][SQL][BACKPORT-2.3] CheckOverflow wh...
Github user mgaido91 closed the pull request at: https://github.com/apache/spark/pull/23233 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23234: [SPARK-26233][SQL][BACKPORT-2.2] CheckOverflow wh...
Github user mgaido91 closed the pull request at: https://github.com/apache/spark/pull/23234 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23232: [SPARK-26233][SQL][BACKPORT-2.4] CheckOverflow wh...
Github user mgaido91 closed the pull request at: https://github.com/apache/spark/pull/23232 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23232: [SPARK-26233][SQL][BACKPORT-2.4] CheckOverflow when enco...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23232 Done, thanks @dongjoon-hyun --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23210: [SPARK-26233][SQL] CheckOverflow when encoding a decimal...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23210 thanks @cloud-fan @dongjoon-hyun, I created the PRs for the backports. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23234: [SPARK-26233][SQL][BACKPORT-2.2] CheckOverflow wh...
GitHub user mgaido91 opened a pull request: https://github.com/apache/spark/pull/23234 [SPARK-26233][SQL][BACKPORT-2.2] CheckOverflow when encoding a decimal value ## What changes were proposed in this pull request? When we encode a Decimal from external source we don't check for overflow. That method is useful not only in order to enforce that we can represent the correct value in the specified range, but it also changes the underlying data to the right precision/scale. Since in our code generation we assume that a decimal has exactly the same precision and scale of its data type, missing to enforce it can lead to corrupted output/results when there are subsequent transformations. ## How was this patch tested? added UT You can merge this pull request into a Git repository by running: $ git pull https://github.com/mgaido91/spark SPARK-26233_2.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23234.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23234 commit 930c51029b845c74357305e7ec30a4f2e6ea748a Author: Marco Gaido Date: 2018-12-04T18:33:27Z [SPARK-26233][SQL] CheckOverflow when encoding a decimal value When we encode a Decimal from external source we don't check for overflow. That method is useful not only in order to enforce that we can represent the correct value in the specified range, but it also changes the underlying data to the right precision/scale. Since in our code generation we assume that a decimal has exactly the same precision and scale of its data type, missing to enforce it can lead to corrupted output/results when there are subsequent transformations. added UT Closes #23210 from mgaido91/SPARK-26233. Authored-by: Marco Gaido Signed-off-by: Dongjoon Hyun --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23234: [SPARK-26233][SQL][BACKPORT-2.2] CheckOverflow when enco...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23234 cc @cloud-fan @dongjoon-hyun --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23233: [SPARK-26233][SQL][BACKPORT-2.3] CheckOverflow when enco...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23233 cc @cloud-fan @dongjoon-hyun --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23233: [SPARK-26233][SQL][BACKPORT-2.3] CheckOverflow wh...
GitHub user mgaido91 opened a pull request: https://github.com/apache/spark/pull/23233 [SPARK-26233][SQL][BACKPORT-2.3] CheckOverflow when encoding a decimal value ## What changes were proposed in this pull request? When we encode a Decimal from external source we don't check for overflow. That method is useful not only in order to enforce that we can represent the correct value in the specified range, but it also changes the underlying data to the right precision/scale. Since in our code generation we assume that a decimal has exactly the same precision and scale of its data type, missing to enforce it can lead to corrupted output/results when there are subsequent transformations. ## How was this patch tested? added UT You can merge this pull request into a Git repository by running: $ git pull https://github.com/mgaido91/spark SPARK-26233_2.3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23233.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23233 commit a1e77445c2675137fbcddf73181c47469f159dbf Author: Marco Gaido Date: 2018-12-04T18:33:27Z [SPARK-26233][SQL] CheckOverflow when encoding a decimal value When we encode a Decimal from external source we don't check for overflow. That method is useful not only in order to enforce that we can represent the correct value in the specified range, but it also changes the underlying data to the right precision/scale. Since in our code generation we assume that a decimal has exactly the same precision and scale of its data type, missing to enforce it can lead to corrupted output/results when there are subsequent transformations. added UT Closes #23210 from mgaido91/SPARK-26233. Authored-by: Marco Gaido Signed-off-by: Dongjoon Hyun --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23232: [SPARK-26233][SQL][BACKPORT-2.4] CheckOverflow when enco...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23232 cc @cloud-fan @dongjoon-hyun --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23232: [SPARK-26233][SQL][BACKPORT-2.4] CheckOverflow wh...
GitHub user mgaido91 opened a pull request: https://github.com/apache/spark/pull/23232 [SPARK-26233][SQL][BACKPORT-2.4] CheckOverflow when encoding a decimal value When we encode a Decimal from external source we don't check for overflow. That method is useful not only in order to enforce that we can represent the correct value in the specified range, but it also changes the underlying data to the right precision/scale. Since in our code generation we assume that a decimal has exactly the same precision and scale of its data type, missing to enforce it can lead to corrupted output/results when there are subsequent transformations. added UT Closes #23210 from mgaido91/SPARK-26233. Authored-by: Marco Gaido Signed-off-by: Dongjoon Hyun ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mgaido91/spark SPARK-26233_2.4 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23232.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23232 commit 821db4854c0e685aac3168da75a1c839681dbfc4 Author: Marco Gaido Date: 2018-12-04T18:33:27Z [SPARK-26233][SQL] CheckOverflow when encoding a decimal value When we encode a Decimal from external source we don't check for overflow. That method is useful not only in order to enforce that we can represent the correct value in the specified range, but it also changes the underlying data to the right precision/scale. Since in our code generation we assume that a decimal has exactly the same precision and scale of its data type, missing to enforce it can lead to corrupted output/results when there are subsequent transformations. added UT Closes #23210 from mgaido91/SPARK-26233. Authored-by: Marco Gaido Signed-off-by: Dongjoon Hyun --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed confi...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23213 Yes, I am wondering too: which is the difference between: `spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=NO_CODEGEN` and `spark.sql.codegen.wholeStage=true,spark.sql.codegen.factoryMode=NO_CODEGEN`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23217: [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat in order...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23217 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23217: [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat i...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23217#discussion_r238700354 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -47,13 +48,17 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria private lazy val keyGetter = InternalRow.getAccessor(keyType) private lazy val valueGetter = InternalRow.getAccessor(valueType) - def put(key: Any, value: Any): Unit = { + def put(key: Any, value: Any, withSizeCheck: Boolean = false): Unit = { if (key == null) { throw new RuntimeException("Cannot use null as map key.") } val index = keyToIndex.getOrDefault(key, -1) if (index == -1) { + if (withSizeCheck && size >= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { --- End diff -- ok, let me remove it then, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238696879 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- Ok, I think I got it now, sorry, I didn't understand :) yes I think this is doable then. Let me try and do that, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23217: [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat i...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23217#discussion_r238690465 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -47,13 +48,17 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria private lazy val keyGetter = InternalRow.getAccessor(keyType) private lazy val valueGetter = InternalRow.getAccessor(valueType) - def put(key: Any, value: Any): Unit = { + def put(key: Any, value: Any, withSizeCheck: Boolean = false): Unit = { if (key == null) { throw new RuntimeException("Cannot use null as map key.") } val index = keyToIndex.getOrDefault(key, -1) if (index == -1) { + if (withSizeCheck && size >= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { --- End diff -- this flag is just for perf reasons, we can skip the check in some conditions and I didn't want to introduce perf overhead if not needed. If we remove the flag we would do the comparison for each item, also when it is not needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23217: [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat in order...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23217 cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23217: [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat i...
GitHub user mgaido91 opened a pull request: https://github.com/apache/spark/pull/23217 [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat in order to check properly the limit size ## What changes were proposed in this pull request? The PR starts from the [comment](https://github.com/apache/spark/pull/23124#discussion_r236112390) in the main one and it aims at: - simplifying the code for `MapConcat`; - be more precise in checking the limit size. ## How was this patch tested? existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/mgaido91/spark SPARK-25829_followup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23217.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23217 commit 54f0f31aaa14de7c44c336580c7ed18e8ffb4b54 Author: Marco Gaido Date: 2018-12-04T12:35:09Z [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat in order to check properly the limit size --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238642801 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- I don't think that is right: that would cause the shuffle to happen for every plan which is hashed by both `[hash part c, hash part b]` and `[hash part d, hash part b]` (and also `[hash part a, hash part b]`). I think that if we want to go that way, we'd need a set of equivalent outputPatitioning --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23213: [SPARK-26262][SQL] Run SQLQueryTestSuite with WHOLESTAGE...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23213 > I personally think its orthogonal to SPARK-24562. yes I agree. I am just asking if it makes sense to create a framework like that. Now it is only about codegen, but in the future we may want to add more configs. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238630487 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- ah I see now what you mean. I am not sure what you are suggesting is feasible. Imagine that in your example the Project is: `Project(a as c, a as d, b, relation)`. What should the output partitioning be? > What do you mean by ... I meant that when we collect the alias for `a as c`, we are mapping all the attr references of `c` with `a as c` here. In the `outputPartitioning`, there will never be an occurrence of a reference to `c`, but only references to `a`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23213: [SPARK-26262][SQL] Run SQLQueryTestSuite with WHOLESTAGE...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23213 just a question, why didn't we introduce something like what was done in SPARK-24562? I see that these are configs which are valid for all queries, so using what was done in SPARK-24562 is not a good idea, but something similar (eg a file with all the config sets to use)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22450: [SPARK-25454][SQL] Avoid precision loss in division with...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/22450 @cloud-fan this has been stuck for a while now. Is there something blocking this? Is there something I can do? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238583330 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- But `ProjectExec.outputPartitioning` cannot contain a reference to the aliases in its project list, as its output partitioning is the one of the child, where that alias doesn't exist. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23210: [SPARK-26233][SQL] CheckOverflow when encoding a ...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23210#discussion_r238471660 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -1647,6 +1647,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext { checkDataset(ds, data: _*) checkAnswer(ds.select("x"), Seq(Row(1), Row(2))) } + + test("SPARK-26233: serializer should enforce decimal precision and scale") { --- End diff -- Well, everything is possible, but it is not easy actually. Because the issue here happens in the codegen, not when we retrieve the output. So if we just encode and decode everything is fine. The problem happens if there is any transformation in the codegen meanwhile, because there the underlying decimal is used (assuming that it has the same precision and scale of the data type - which without the current change is not always true). I tried checking the precision and scale of the serialized object, but it is not really feasible as they are converted when it is read (please see `UnsafeRow`)... So I'd avoid this actually. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238460901 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- this is not dealing with the aliases in the `outputPartitioning` but with the ones in the `requiredChildDistribution`. Anyway, I wouldn't do it there, because this would mean moving also the logic for collecting the aliases from the children there, which seems to me an operations which belong to a rule/transforming operator, rather than to the plan operator itself (eg. now these methods are in `PredicateHelper`...). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238459238 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { --- End diff -- I think it is. We are only checking the presence of aliases. In particular, we are collecting all the aliases which are defined in the previous operator. The solution you are suggesting works too IMHO and restricts the scope, but I am not sure it is a good thing, because I see no harm in doing it for other operators: simply they won't contain aliases; while I do see some issues in the maintenance of the "whitelist" of operators you are suggesting (we may miss some now or forget to update later...) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23210: [SPARK-26233][SQL] CheckOverflow when encoding a ...
GitHub user mgaido91 opened a pull request: https://github.com/apache/spark/pull/23210 [SPARK-26233][SQL] CheckOverflow when encoding a decimal value ## What changes were proposed in this pull request? When we encode a Decimal from external source we don't check for overflow. That method is useful not only in order to enforce that we can represent the correct value in the specified range, but it also changes the underlying data to the right precision/scale. Since in our code generation we assume that a decimal has exactly the same precision and scale of its data type, missing to enforce it can lead to corrupted output/results when there are subsequent transformations. ## How was this patch tested? added UT You can merge this pull request into a Git repository by running: $ git pull https://github.com/mgaido91/spark SPARK-26233 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23210.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23210 commit 91d3e1b49667d3d5023663c8507570a118c54254 Author: Marco Gaido Date: 2018-12-03T16:16:08Z [SPARK-26233][SQL] CheckOverflow when encoding a decimal value --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23057: [SPARK-26078][SQL] Dedup self-join attributes on IN subq...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23057 @mccheah this is waiting for reviews by committers --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238292468 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -780,6 +780,23 @@ class PlannerSuite extends SharedSQLContext { classOf[PartitioningCollection]) } } + + test("SPARK-25951: avoid redundant shuffle on rename") { --- End diff -- thanks @gatorsmile, I added also a negative case, but I don't think it is enough. Do you have some hints on cases to test? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238291824 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -223,14 +223,35 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. + * + * This method should be used (instead of `sameResult`) when comparing if 2 expressions are the + * same and one can replace the other (eg. in Optimizer/Analyzer rules where we want to replace + * equivalent expressions). It should not be used (and `sameResult` should be used instead) when + * comparing if 2 expressions produce the same results (in this case `semanticEquals` can be too + * strict). */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output from + * plan perspective may be different, because of different names or similar differences. + * Usually this means that their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + * + * This method should be used (instead of `semanticEquals`) when checking if 2 expressions + * produce the same results (eg. as in the case we are interested to check if the ordering is the + * same). It should not be used (and `semanticEquals` should be used instead) when comparing if 2 + * expressions are the same and one can replace the other. + */ + final def sameResult(other: Expression): Boolean = --- End diff -- thanks, added --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23186: [SPARK-26230][SQL]FileIndex: if case sensitive, validate...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23186 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23150: [SPARK-26178][SQL] Use java.time API for parsing ...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23150#discussion_r238195206 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1618,6 +1618,13 @@ object SQLConf { "a SparkConf entry.") .booleanConf .createWithDefault(true) + + val LEGACY_TIME_PARSER_ENABLED = buildConf("spark.sql.legacy.timeParser.enabled") +.doc("When set to true, java.text.SimpleDateFormat is using for formatting and parsing " + + " dates/timestamps in a locale-sensitive manner. When set to false, classes from " + + "java.time.* packages are using for the same purpose.") --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23150: [SPARK-26178][SQL] Use java.time API for parsing ...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23150#discussion_r238195153 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1618,6 +1618,13 @@ object SQLConf { "a SparkConf entry.") .booleanConf .createWithDefault(true) + + val LEGACY_TIME_PARSER_ENABLED = buildConf("spark.sql.legacy.timeParser.enabled") +.doc("When set to true, java.text.SimpleDateFormat is using for formatting and parsing " + --- End diff -- nit `using` -> `used` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23186: [SPARK-26230][SQL]FileIndex: if case sensitive, v...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23186#discussion_r238008395 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala --- @@ -345,15 +346,18 @@ object PartitioningUtils { */ def resolvePartitions( pathsWithPartitionValues: Seq[(Path, PartitionValues)], + caseSensitive: Boolean, timeZone: TimeZone): Seq[PartitionValues] = { if (pathsWithPartitionValues.isEmpty) { Seq.empty } else { - // TODO: Selective case sensitivity. - val distinctPartColNames = - pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase())).distinct + val distinctPartColNames = if (caseSensitive) { +pathsWithPartitionValues.map(_._2.columnNames) + } else { +pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase())) + } assert( -distinctPartColNames.size == 1, +distinctPartColNames.distinct.size == 1, listConflictingPartitionColumns(pathsWithPartitionValues)) --- End diff -- yes I see, thanks for the kind explanation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237905754 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -780,6 +780,23 @@ class PlannerSuite extends SharedSQLContext { classOf[PartitioningCollection]) } } + + test("SPARK-25951: avoid redundant shuffle on rename") { --- End diff -- @cloud-fan @viirya I added the test, but as I mentioned I had to do another change in order to make it working. Sorry for the mistake. I'd really appreciate if you could review it again. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23186: [SPARK-26230][SQL]FileIndex: if case sensitive, v...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23186#discussion_r237889346 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala --- @@ -345,15 +346,18 @@ object PartitioningUtils { */ def resolvePartitions( pathsWithPartitionValues: Seq[(Path, PartitionValues)], + caseSensitive: Boolean, timeZone: TimeZone): Seq[PartitionValues] = { if (pathsWithPartitionValues.isEmpty) { Seq.empty } else { - // TODO: Selective case sensitivity. - val distinctPartColNames = - pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase())).distinct + val distinctPartColNames = if (caseSensitive) { +pathsWithPartitionValues.map(_._2.columnNames) + } else { +pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase())) + } assert( -distinctPartColNames.size == 1, +distinctPartColNames.distinct.size == 1, listConflictingPartitionColumns(pathsWithPartitionValues)) --- End diff -- why don't we use `distinctPartColNames` as parameter here? Moreover, is that method working fine according to case sensitivity? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23186: [SPARK-26230][SQL]FileIndex: if case sensitive, v...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23186#discussion_r237889521 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala --- @@ -65,6 +65,34 @@ class FileIndexSuite extends SharedSQLContext { } } + test("SPARK-26230: if case sensitive, validate partitions with original column names") { +withTempDir { dir => + val partitionDirectory = new File(dir, s"a=1") + partitionDirectory.mkdir() + val file = new File(partitionDirectory, "text.txt") + stringToFile(file, "text") + val partitionDirectory2 = new File(dir, s"A=2") + partitionDirectory2.mkdir() + val file2 = new File(partitionDirectory2, "text.txt") + stringToFile(file2, "text") + val path = new Path(dir.getCanonicalPath) + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { +val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) +val partitionValues = fileIndex.partitionSpec().partitions.map(_.values) +assert(partitionValues.length == 2) + } + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { +val msg = intercept[AssertionError] { + val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) + fileIndex.partitionSpec() +}.getMessage +assert(msg.contains("Conflicting partition column names detected")) --- End diff -- can we ensure that the message contains the right partitions? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23186: [SPARK-26230][SQL]FileIndex: if case sensitive, v...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23186#discussion_r237888926 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala --- @@ -345,15 +346,18 @@ object PartitioningUtils { */ def resolvePartitions( pathsWithPartitionValues: Seq[(Path, PartitionValues)], + caseSensitive: Boolean, timeZone: TimeZone): Seq[PartitionValues] = { if (pathsWithPartitionValues.isEmpty) { Seq.empty } else { - // TODO: Selective case sensitivity. - val distinctPartColNames = - pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase())).distinct + val distinctPartColNames = if (caseSensitive) { --- End diff -- nit: maybe rename as there is no distinct anymore? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237887275 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -780,6 +780,23 @@ class PlannerSuite extends SharedSQLContext { classOf[PartitioningCollection]) } } + + test("SPARK-25951: avoid redundant shuffle on rename") { --- End diff -- ah, good point and indeed very useful. In my previous tests I always used a very simple query to verify this and never the one reported in the JIRA. Now I tried that one and I realized that this fix is not very useful as of now, because in renaming like that in the `HashPatitioning` there is the `AttributeReference` to the `Alias`, rather than the `Alias` itself. Since that is the common case, the PR as it is now it is not very useful. If I won't be able to figure out a good way for that, I am going to close this. Thanks and sorry for the trouble. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237787920 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,35 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. + * + * This method should be used (instead of `sameResult`) when comparing if 2 expressions are the + * same and one can replace the other (eg. in Optimizer/Analyzer rules where we want to replace + * equivalent expressions). It should not be used (and `sameResult` should be used instead) when + * comparing if 2 expressions produce the same results (in this case `semanticEquals` can be too + * strict). */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. --- End diff -- yes, I mean: `sameResult` returns true if 2 expressions return the same data even though from plan perspective they are not the same (eg. the output name/exprIds is different as in this case), while `semanticEquals` ensure they are the same from plan perspective too. If you have better suggestions how to rephrase this, I am happy to improve it. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23183: [SPARK-26226][SQL] Update query tracker to report timeli...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23183 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23171 yes @aokolnychyi , I agree that the work can be done later (not in the scope of this PR). We can maybe just open a new JIRA about it so we won't forget. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23171 @dbtsai I see, it would be great, though, to check which is this threshold. My understanding is that the current solution has better performance even for several hundreds of items. If this number is some thousands and since this depends on the datatype (so it is hard to control by the users with a single config), it is arguable which is the best solution: I don't think it is very common to have thousands of elements, while for lower numbers (more common) we would use the less efficient solution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts,...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23171#discussion_r237564039 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -335,6 +343,41 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { """.stripMargin) } + private def genCodeWithSwitch(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val (nullLiterals, nonNullLiterals) = list.partition { + case Literal(null, _) => true + case _ => false +} +val listGen = nonNullLiterals.map(_.genCode(ctx)) +val valueGen = value.genCode(ctx) + +val caseBranches = listGen.map(literal => + s""" + |case ${literal.value}: + | ${ev.value} = true; + | break; + """.stripMargin) + +ev.copy(code = + code""" + |${valueGen.code} + |${CodeGenerator.JAVA_BOOLEAN} ${ev.isNull} = ${valueGen.isNull}; + |${CodeGenerator.JAVA_BOOLEAN} ${ev.value} = false; + |if (!${valueGen.isNull}) { + | switch (${valueGen.value}) { + |${caseBranches.mkString("")} --- End diff -- we should consider that if the number of items is very big, this can cause a compile exception due to the method size limit. So we should use the proper splitting methods for the code --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237539036 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2542,10 +2542,10 @@ object EliminateUnions extends Rule[LogicalPlan] { * rule can't work for those parameters. */ object CleanupAliases extends Rule[LogicalPlan] { - private def trimAliases(e: Expression): Expression = { + private[catalyst] def trimAliases(e: Expression): Expression = { e.transformDown { - case Alias(child, _) => child - case MultiAlias(child, _) => child + case Alias(child, _) => trimAliases(child) --- End diff -- just using `transformUp`solves the issue --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237536540 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2542,10 +2542,10 @@ object EliminateUnions extends Rule[LogicalPlan] { * rule can't work for those parameters. */ object CleanupAliases extends Rule[LogicalPlan] { - private def trimAliases(e: Expression): Expression = { + private[catalyst] def trimAliases(e: Expression): Expression = { e.transformDown { - case Alias(child, _) => child - case MultiAlias(child, _) => child + case Alias(child, _) => trimAliases(child) --- End diff -- ah, I did a stupid thing here. So the problem is that: since it returns `child` for `this`, in transformDown we apply the rule to `child` children, instead of applying to `child` itself. So the problem here is with 2 consecutive `Alias`. Let me find a better fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237526646 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2542,10 +2542,10 @@ object EliminateUnions extends Rule[LogicalPlan] { * rule can't work for those parameters. */ object CleanupAliases extends Rule[LogicalPlan] { - private def trimAliases(e: Expression): Expression = { + private[catalyst] def trimAliases(e: Expression): Expression = { e.transformDown { - case Alias(child, _) => child - case MultiAlias(child, _) => child + case Alias(child, _) => trimAliases(child) --- End diff -- the point is that now this method removes only the first `Alias` it finds (and it doesn't go on recursively), which is the reason of the UT failure. Also checking the comment on the method it seems not the expected behavior of this method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23165: [SPARK-26188][SQL] FileIndex: don't infer data ty...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23165#discussion_r237518419 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala --- @@ -250,7 +276,13 @@ object PartitioningUtils { val rawColumnValue = columnSpec.drop(equalSignIndex + 1) assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'") - val literal = inferPartitionColumnValue(rawColumnValue, typeInference, timeZone) + val literal = if (userSpecifiedDataTypes.contains(columnName)) { +// SPARK-26188: if user provides corresponding column schema, process the column as String +// type and cast it as user specified data type later. +inferPartitionColumnValue(rawColumnValue, false, timeZone) --- End diff -- can't we make it returning `Option[(String, Literal)]`? If not, what about `Literal(Cast(...).eval())`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23165: [SPARK-26188][SQL] FileIndex: don't infer data ty...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23165#discussion_r237510736 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala --- @@ -250,7 +276,13 @@ object PartitioningUtils { val rawColumnValue = columnSpec.drop(equalSignIndex + 1) assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'") - val literal = inferPartitionColumnValue(rawColumnValue, typeInference, timeZone) + val literal = if (userSpecifiedDataTypes.contains(columnName)) { +// SPARK-26188: if user provides corresponding column schema, process the column as String +// type and cast it as user specified data type later. +inferPartitionColumnValue(rawColumnValue, false, timeZone) --- End diff -- can't we add the cast here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23165: [SPARK-26188][SQL] FileIndex: don't infer data ty...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23165#discussion_r237510234 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala --- @@ -94,18 +94,34 @@ object PartitioningUtils { paths: Seq[Path], typeInference: Boolean, basePaths: Set[Path], + userSpecifiedSchema: Option[StructType], + caseSensitive: Boolean, timeZoneId: String): PartitionSpec = { -parsePartitions(paths, typeInference, basePaths, DateTimeUtils.getTimeZone(timeZoneId)) +parsePartitions(paths, typeInference, basePaths, userSpecifiedSchema, + caseSensitive, DateTimeUtils.getTimeZone(timeZoneId)) } private[datasources] def parsePartitions( paths: Seq[Path], typeInference: Boolean, basePaths: Set[Path], + userSpecifiedSchema: Option[StructType], + caseSensitive: Boolean, timeZone: TimeZone): PartitionSpec = { +val userSpecifiedDataTypes = if (userSpecifiedSchema.isDefined) { + val nameToDataType = userSpecifiedSchema.get.fields.map(f => f.name -> f.dataType).toMap + if (caseSensitive) { +CaseInsensitiveMap(nameToDataType) --- End diff -- isn't this if `!caseSensitive`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22957: [SPARK-25951][SQL] Ignore aliases for distributions and ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/22957 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23150: [SPARK-26178][SQL] Use java.time API for parsing ...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23150#discussion_r237451758 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala --- @@ -23,10 +23,16 @@ import scala.util.control.Exception.allCatch import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.DateTimeFormatter import org.apache.spark.sql.types._ -object CSVInferSchema { +class CSVInferSchema(val options: CSVOptions) extends Serializable { --- End diff -- since we get the `CSVOptions` in the constructor, shall we remove it as a parameter of the several methods? it is pretty confusing which one is used right now... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23176: [SPARK-26211][SQL] Fix InSet for binary, and struct and ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23176 LGTM, thanks for the fix! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23176: [SPARK-26211][SQL] Fix InSet for binary, and struct and ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23176 LGTM, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r237093630 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala --- @@ -126,35 +126,32 @@ abstract class PartitioningAwareFileIndex( val caseInsensitiveOptions = CaseInsensitiveMap(parameters) val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) - -userPartitionSchema match { +val inferredPartitionSpec = PartitioningUtils.parsePartitions( + leafDirs, + typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, --- End diff -- actually the investigation was done by the reported of SPARK-26188, I did nothing... Thanks for doing that @gengliangwang and thanks for your comment @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237075431 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { +case a: Alias => sameResult(a.child) +case _ => this.semanticEquals(other) --- End diff -- well, it needs to be overridden by `HashPartitioning` too, so since I am not able to make it final anyway, I don't think it is a good idea. Well, I can add a match on `HashPartitioning`too, but it doesn't seem a clean solution to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237073129 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { +case a: Alias => sameResult(a.child) +case _ => this.semanticEquals(other) --- End diff -- I think it is doable, but I didn't want to put too many `match` where it was not needed. But if you prefer that way I can try and do that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237070496 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { --- End diff -- Sure, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23153: [SPARK-26147][SQL] only pull out unevaluable python udf ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23153 a late LGTM as well, thanks @cloud-fan for the patch and thanks @xuanyuanking for the review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237068718 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { --- End diff -- remove `Alias` is not possible for the reason explained in https://github.com/apache/spark/pull/22957#issuecomment-436992955. In general, `semanticEquals` should be used when we want to replace an expression with another, while `sameResult` should be used in order to check that 2 expressions return the same output. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237062190 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { --- End diff -- that is reasonable but it doesn't solve the problem stated in the JIRA. So the goal here is to avoid that something like `a as b` is considered different from `a` in terms of ordering/distribution. If we just erase the name of alias, the 2 expression would still be different because of the presence of `Alias` itself would make the 2 expressions different. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22947: [SPARK-24913][SQL] Make AssertNotNull and AssertTrue non...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/22947 any more comments on this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22957: [SPARK-25951][SQL] Ignore aliases for distributions and ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/22957 cc @cloud-fan @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23057: [SPARK-26078][SQL] Dedup self-join attributes on IN subq...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23057 cc @gatorsmile too --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22764: [SPARK-25765][ML] Add training cost to BisectingKMeans s...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/22764 @dbtsai any luck with this? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23124 LGTM too --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r236998030 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala --- @@ -126,35 +126,32 @@ abstract class PartitioningAwareFileIndex( val caseInsensitiveOptions = CaseInsensitiveMap(parameters) val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) - -userPartitionSchema match { +val inferredPartitionSpec = PartitioningUtils.parsePartitions( + leafDirs, + typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, --- End diff -- this is causing a behavior change in Spark 2.4.0 reported in SPARK-26188. Why did we need this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23153: [SPARK-26147][SQL] only pull out unevaluable python udf ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23153 the change itself seems fine to me, as @xuanyuanking mentioned, though, we should update the existing tests. What about adding a test in the new suite checking the plans instead of a end-to-end test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23057: [SPARK-26078][SQL] Dedup self-join attributes on IN subq...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23057 any comments @cloud-fan ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r236171035 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -646,34 +633,35 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres val mapMerge = s""" -|${ev.isNull} = $hasNullName; -|if (!${ev.isNull}) { -| $arrayDataClass[] $keyArgsName = new $arrayDataClass[${mapCodes.size}]; -| $arrayDataClass[] $valArgsName = new $arrayDataClass[${mapCodes.size}]; -| long $numElementsName = 0; -| for (int $idxName = 0; $idxName < $argsName.length; $idxName++) { -|$keyArgsName[$idxName] = $argsName[$idxName].keyArray(); -|$valArgsName[$idxName] = $argsName[$idxName].valueArray(); -|$numElementsName += $argsName[$idxName].numElements(); -| } -| if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) { -|throw new RuntimeException("Unsuccessful attempt to concat maps with " + -| $numElementsName + " elements due to exceeding the map size limit " + -| "${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}."); -| } -| $arrayDataClass $finKeysName = $keyConcat($keyArgsName, -|(int) $numElementsName); -| $arrayDataClass $finValsName = $valueConcat($valArgsName, -|(int) $numElementsName); -| ${ev.value} = new $arrayBasedMapDataClass($finKeysName, $finValsName); +|ArrayData[] $keyArgsName = new ArrayData[${mapCodes.size}]; +|ArrayData[] $valArgsName = new ArrayData[${mapCodes.size}]; +|long $numElementsName = 0; +|for (int $idxName = 0; $idxName < $argsName.length; $idxName++) { +| $keyArgsName[$idxName] = $argsName[$idxName].keyArray(); +| $valArgsName[$idxName] = $argsName[$idxName].valueArray(); +| $numElementsName += $argsName[$idxName].numElements(); |} +|if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) { --- End diff -- I see, I agree doing it in a followup, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r236170759 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres override def nullable: Boolean = children.exists(_.nullable) + private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType) + override def eval(input: InternalRow): Any = { -val maps = children.map(_.eval(input)) +val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray --- End diff -- Yes, but converting `toArray` may require an extra O(N) operation for the copy, so I am not sure the difference between `while` and `foreach` is significant enough to cover the overhead of the copy... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r235932502 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres override def nullable: Boolean = children.exists(_.nullable) + private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType) + override def eval(input: InternalRow): Any = { -val maps = children.map(_.eval(input)) +val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray --- End diff -- well, my understanding is that we could do a `maps.foreach` instead of accessing them by index. I don't see the index to be significant at all, but maybe I am missing something... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r235931894 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -646,34 +633,35 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres val mapMerge = s""" -|${ev.isNull} = $hasNullName; -|if (!${ev.isNull}) { -| $arrayDataClass[] $keyArgsName = new $arrayDataClass[${mapCodes.size}]; -| $arrayDataClass[] $valArgsName = new $arrayDataClass[${mapCodes.size}]; -| long $numElementsName = 0; -| for (int $idxName = 0; $idxName < $argsName.length; $idxName++) { -|$keyArgsName[$idxName] = $argsName[$idxName].keyArray(); -|$valArgsName[$idxName] = $argsName[$idxName].valueArray(); -|$numElementsName += $argsName[$idxName].numElements(); -| } -| if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) { -|throw new RuntimeException("Unsuccessful attempt to concat maps with " + -| $numElementsName + " elements due to exceeding the map size limit " + -| "${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}."); -| } -| $arrayDataClass $finKeysName = $keyConcat($keyArgsName, -|(int) $numElementsName); -| $arrayDataClass $finValsName = $valueConcat($valArgsName, -|(int) $numElementsName); -| ${ev.value} = new $arrayBasedMapDataClass($finKeysName, $finValsName); +|ArrayData[] $keyArgsName = new ArrayData[${mapCodes.size}]; +|ArrayData[] $valArgsName = new ArrayData[${mapCodes.size}]; +|long $numElementsName = 0; +|for (int $idxName = 0; $idxName < $argsName.length; $idxName++) { +| $keyArgsName[$idxName] = $argsName[$idxName].keyArray(); +| $valArgsName[$idxName] = $argsName[$idxName].valueArray(); +| $numElementsName += $argsName[$idxName].numElements(); |} +|if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) { --- End diff -- yes, but we could do the putAll before and eventually fail when we reach the limit. We can maybe do that in a followup, though, as it is not introducing any regression.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r235931588 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType} + +/** + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes + * duplicated map keys w.r.t. the last wins policy. + */ +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable { + assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map") + + private lazy val keyToIndex = keyType match { +case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int] +case _ => + // for complex types, use interpreted ordering to be able to compare unsafe data with safe + // data, e.g. UnsafeRow vs GenericInternalRow. + mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType)) + } + + // TODO: specialize it + private lazy val keys = mutable.ArrayBuffer.empty[Any] + private lazy val values = mutable.ArrayBuffer.empty[Any] + + private lazy val keyGetter = InternalRow.getAccessor(keyType) + private lazy val valueGetter = InternalRow.getAccessor(valueType) + + def reset(): Unit = { +keyToIndex.clear() +keys.clear() +values.clear() + } + + def put(key: Any, value: Any): Unit = { +if (key == null) { + throw new RuntimeException("Cannot use null as map key.") +} + +val maybeExistingIdx = keyToIndex.get(key) +if (maybeExistingIdx.isDefined) { + // Overwrite the previous value, as the policy is last wins. + values(maybeExistingIdx.get) = value +} else { + keyToIndex.put(key, values.length) + keys.append(key) + values.append(value) +} + } + + // write a 2-field row, the first field is key and the second field is value. + def put(entry: InternalRow): Unit = { +if (entry.isNullAt(0)) { --- End diff -- Oh I see now, I missed it, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r235866111 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -646,34 +633,35 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres val mapMerge = s""" -|${ev.isNull} = $hasNullName; -|if (!${ev.isNull}) { -| $arrayDataClass[] $keyArgsName = new $arrayDataClass[${mapCodes.size}]; -| $arrayDataClass[] $valArgsName = new $arrayDataClass[${mapCodes.size}]; -| long $numElementsName = 0; -| for (int $idxName = 0; $idxName < $argsName.length; $idxName++) { -|$keyArgsName[$idxName] = $argsName[$idxName].keyArray(); -|$valArgsName[$idxName] = $argsName[$idxName].valueArray(); -|$numElementsName += $argsName[$idxName].numElements(); -| } -| if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) { -|throw new RuntimeException("Unsuccessful attempt to concat maps with " + -| $numElementsName + " elements due to exceeding the map size limit " + -| "${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}."); -| } -| $arrayDataClass $finKeysName = $keyConcat($keyArgsName, -|(int) $numElementsName); -| $arrayDataClass $finValsName = $valueConcat($valArgsName, -|(int) $numElementsName); -| ${ev.value} = new $arrayBasedMapDataClass($finKeysName, $finValsName); +|ArrayData[] $keyArgsName = new ArrayData[${mapCodes.size}]; +|ArrayData[] $valArgsName = new ArrayData[${mapCodes.size}]; +|long $numElementsName = 0; +|for (int $idxName = 0; $idxName < $argsName.length; $idxName++) { +| $keyArgsName[$idxName] = $argsName[$idxName].keyArray(); +| $valArgsName[$idxName] = $argsName[$idxName].valueArray(); +| $numElementsName += $argsName[$idxName].numElements(); |} +|if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) { --- End diff -- this check is not really correct, as we are not considering duplicates IIUC. I think we can change this behavior using `putAll` and checking the size in the loop. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r235865179 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres override def nullable: Boolean = children.exists(_.nullable) + private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType) + override def eval(input: InternalRow): Any = { -val maps = children.map(_.eval(input)) +val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray --- End diff -- why do we need `toArray` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r235867070 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType} + +/** + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes + * duplicated map keys w.r.t. the last wins policy. + */ +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable { + assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map") + + private lazy val keyToIndex = keyType match { +case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int] +case _ => + // for complex types, use interpreted ordering to be able to compare unsafe data with safe + // data, e.g. UnsafeRow vs GenericInternalRow. + mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType)) + } + + // TODO: specialize it + private lazy val keys = mutable.ArrayBuffer.empty[Any] + private lazy val values = mutable.ArrayBuffer.empty[Any] + + private lazy val keyGetter = InternalRow.getAccessor(keyType) + private lazy val valueGetter = InternalRow.getAccessor(valueType) + + def reset(): Unit = { +keyToIndex.clear() +keys.clear() +values.clear() + } + + def put(key: Any, value: Any): Unit = { +if (key == null) { + throw new RuntimeException("Cannot use null as map key.") +} + +val maybeExistingIdx = keyToIndex.get(key) +if (maybeExistingIdx.isDefined) { + // Overwrite the previous value, as the policy is last wins. + values(maybeExistingIdx.get) = value +} else { + keyToIndex.put(key, values.length) + keys.append(key) + values.append(value) +} + } + + // write a 2-field row, the first field is key and the second field is value. + def put(entry: InternalRow): Unit = { +if (entry.isNullAt(0)) { --- End diff -- this is checked only here and not in all the other put...I think we should be consistent and either check it always or never do it.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23104: [SPARK-26138][SQL] LimitPushDown cross join requires may...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23104 @guoxiaolongzte still that doesn't explain why we can push to the right side too. I do believe that it is possible. If the right side contains more than N items, where N is the limit size, the output will contains the combinations of the first item from the left side and the first N items from the right side. If the right side contains less than N items, pushing the limit on its side has no effect on the result. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22875: [SPARK-25867][ML] Remove KMeans computeCost
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/22875 sure, thanks @srowen , no need to apologize at all, thanks for your help reviewing this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22875: [SPARK-25867][ML] Remove KMeans computeCost
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/22875 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23104: [SPARK-26138][SQL] LimitPushDown cross join requi...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23104#discussion_r235392255 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -459,6 +459,7 @@ object LimitPushDown extends Rule[LogicalPlan] { val newJoin = joinType match { case RightOuter => join.copy(right = maybePushLocalLimit(exp, right)) case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left)) +case Cross => join.copy(left = maybePushLocalLimit(exp, left)) --- End diff -- what if the other table is empty? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22875: [SPARK-25867][ML] Remove KMeans computeCost
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/22875 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23057: [SPARK-26078][SQL] Dedup self-join attributes on IN subq...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/23057 thanks @viirya , I added a comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22875: [SPARK-25867][ML] Remove KMeans computeCost
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/22875 thanks for your review @srowen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22875: [SPARK-25867][ML] Remove KMeans computeCost
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/22875 Yes, it was deprecated in #22756 and it is deprecated since 3.0, so we cannot remove it... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22875: [SPARK-25867][ML] Remove KMeans computeCost
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/22875 Yes, the problem is that there is also the `computeCost` of `BisectingKMeans`. I proposed to deprecate it in 2.4 and remove in 3.0, but I didn't manage to have it done for 2.4 (please refer to the discussions on https://github.com/apache/spark/pull/22764 and https://github.com/apache/spark/pull/22756 for the details). So `computeCost` of `BisectingKMeans` cannot be removed in 3.0 (unfortunately). The examples for KMeans have already been updated using ClusteringEvaluator in #19676. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23093: [SPARK-26127][ML] Remove deprecated setImpurity f...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/23093#discussion_r235016727 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala --- @@ -91,7 +91,7 @@ class DecisionTreeClassifier @Since("1.4.0") ( /** @group setParam */ @Since("1.4.0") - override def setImpurity(value: String): this.type = set(impurity, value) + def setImpurity(value: String): this.type = set(impurity, value) --- End diff -- I don't think those can be removed. The ones which were deprecated are the `setImpurity` on the `Model`s, not on the `Classifier`s/`Regressor`s which build the models. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22875: [SPARK-25867][ML] Remove KMeans computeCost
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/22875 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org