[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238683833 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala --- @@ -535,4 +535,98 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers with PlanTestB assert(unsafeRow.getSizeInBytes == 8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + roundedSize(field2.getSizeInBytes)) } + + testBothCodegenAndInterpreted("SPARK-25374 converts back into safe representation") { +def convertBackToInternalRow(inputRow: InternalRow, fields: Array[DataType]): InternalRow = { + val unsafeProj = UnsafeProjection.create(fields) + val unsafeRow = unsafeProj(inputRow) + val safeProj = SafeProjection.create(fields) + safeProj(unsafeRow) +} + +// Simple tests +val inputRow = InternalRow.fromSeq(Seq( + false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, UTF8String.fromString("test"), + Decimal(255), CalendarInterval.fromString("interval 1 day"), Array[Byte](1, 2) +)) +val fields1 = Array( + BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, + DoubleType, StringType, DecimalType.defaultConcreteType, CalendarIntervalType, + BinaryType) + +assert(convertBackToInternalRow(inputRow, fields1) === inputRow) + +// Array tests +val arrayRow = InternalRow.fromSeq(Seq( + createArray(1, 2, 3), + createArray( +createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*), +createArray(Seq("d").map(UTF8String.fromString): _*)) +)) +val fields2 = Array[DataType]( + ArrayType(IntegerType), + ArrayType(ArrayType(StringType))) + +assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow) + +// Struct tests +val structRow = InternalRow.fromSeq(Seq( + InternalRow.fromSeq(Seq[Any](1, 4.0)), + InternalRow.fromSeq(Seq( +UTF8String.fromString("test"), +InternalRow.fromSeq(Seq( + 1, + createArray(Seq("2", "3").map(UTF8String.fromString): _*) +)) + )) +)) +val fields3 = Array[DataType]( + StructType( +StructField("c0", IntegerType) :: +StructField("c1", DoubleType) :: +Nil), + StructType( +StructField("c2", StringType) :: +StructField("c3", StructType( + StructField("c4", IntegerType) :: + StructField("c5", ArrayType(StringType)) :: + Nil)) :: +Nil)) + +assert(convertBackToInternalRow(structRow, fields3) === structRow) + +// Map tests +val mapRow = InternalRow.fromSeq(Seq( + createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2), + createMap( +createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*), +createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*) + )( +createMap(Seq("k3", "k4").map(UTF8String.fromString): _*)(3.toShort, 4.toShort), +createMap(Seq("k5", "k6").map(UTF8String.fromString): _*)(5.toShort, 6.toShort) + ))) +val fields4 = Array[DataType]( + MapType(StringType, IntegerType), + MapType(MapType(IntegerType, StringType), MapType(StringType, ShortType))) + +val mapResultRow = convertBackToInternalRow(mapRow, fields4).toSeq(fields4) +val mapExpectedRow = mapRow.toSeq(fields4) +// Since `ArrayBasedMapData` does not override `equals` and `hashCode`, --- End diff -- `ArrayBasedMapData`/`UnsafeMapData` does not have `equals()` or `hashCode()` implemented because we do not have a good story around map equality. Implementing equals/hashcode for map is only half of the solution, we would also need a comparable binary format. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/23127 Looks good. One more higher level question that can also be addressed in a follow-up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/23127#discussion_r236017398 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -406,14 +415,39 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { override def limitNotReachedChecks: Seq[String] = Nil } +/** + * Leaf codegen node reading from a single RDD. + */ +trait InputRDDCodegen extends CodegenSupport { --- End diff -- Should we reconcile this with the code gen for `RowDataSourceScanExec`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23022: [SPARK-26038] Decimal toScalaBigInt/toJavaBigInteger for...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/23022 Merging to master. Thank! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23096: [SPARK-26129][SQL] Instrumentation for per-query ...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/23096#discussion_r235159238 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala --- @@ -648,7 +648,11 @@ class SparkSession private( * @since 2.0.0 */ def sql(sqlText: String): DataFrame = { -Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText)) +val tracker = new QueryPlanningTracker --- End diff -- @dongjoon-hyun just out of curiosity, what would you like to disable here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23075: [SPARK-26084][SQL] Fixes unresolved AggregateExpression....
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/23075 Also backported to 2.3/2.4. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23075: [SPARK-26084][SQL] Fixes unresolved AggregateExpression....
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/23075 Merging to master. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23075: [SPARK-26084][SQL] Fixes unresolved AggregateExpression....
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/23075 Let's see if this works :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23075: [SPARK-26084][SQL] Fixes unresolved AggregateExpression....
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/23075 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23075: [SPARK-26084][SQL] Fixes unresolved AggregateExpression....
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/23075 Ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23018: [SPARK-26023][SQL] Dumping truncated plans and generated...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/23018 Merging to master. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23018: [SPARK-26023][SQL] Dumping truncated plans and generated...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/23018 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22961: [SPARK-25947][SQL] Reduce memory usage in Shuffle...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22961#discussion_r232061457 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -214,13 +214,24 @@ object ShuffleExchangeExec { override def getPartition(key: Any): Int = key.asInstanceOf[Int] } case RangePartitioning(sortingExpressions, numPartitions) => -// Internally, RangePartitioner runs a job on the RDD that samples keys to compute -// partition bounds. To get accurate samples, we need to copy the mutable keys. +// Extract only fields used for sorting to avoid collecting large fields that does not +// affect sorting result when deciding partition bounds in RangePartitioner val rddForSampling = rdd.mapPartitionsInternal { iter => + val projection = +UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes) val mutablePair = new MutablePair[InternalRow, Null]() - iter.map(row => mutablePair.update(row.copy(), null)) + // Internally, RangePartitioner runs a job on the RDD that samples keys to compute + // partition bounds. To get accurate samples, we need to copy the mutable keys. + iter.map(row => mutablePair.update(projection(row).copy(), null)) } -implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes) +// Construct ordering on extracted sort key. +val orderingAttributes = --- End diff -- This is a bit clunky IMO. Can we do this instead: ```scala val orderingAttributes = sortingExpressions.zipWithIndex.map { case (ord, i) => ord.copy(child = BoundReference(i, ord.dataType, ord.nullable)) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22964: [SPARK-25963] Optimize generate followed by window
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22964 @uzadude where is this relevant? You will end up with two shuffles if you do this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22932: [SPARK-25102][SQL] Write Spark version to ORC/Par...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22932#discussion_r230604337 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/package.scala --- @@ -44,4 +44,13 @@ package object sql { type Strategy = SparkStrategy type DataFrame = Dataset[Row] + + /** + * Metadata key which is used to write Spark version in the followings: + * - Parquet file metadata + * - ORC file metadata + * + * Note that Hive table property `spark.sql.create.version` also has Spark version. + */ + private[sql] val CREATE_VERSION = "org.apache.spark.sql.create.version" --- End diff -- Is this a pre-existing key? Seems that `org.apache.spark.version` should be enough. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22925: [SPARK-25913][SQL] Extend UnaryExecNode by unary SparkPl...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22925 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22789: [SPARK-25767][SQL] Fix lazily evaluated stream of expres...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22789 Merged to master/2.4 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22789: [SPARK-25767][SQL] Fix lazily evaluated stream of expres...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22789 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22789: [SPARK-25767][SQL] Fix lazily evaluated stream of...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22789#discussion_r228760802 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -319,4 +319,15 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { assert(df.limit(1).collect() === Array(Row("bat", 8.0))) } } + + test("SPARK-25767: Lazy evaluated stream of expressions handled correctly") { --- End diff -- oh wait, I executed those commands in sparkShell and that falls back to interpreted mode when compilation fails. That is what happened here. Sorry about the fuss. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22789: [SPARK-25767][SQL] Fix lazily evaluated stream of...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22789#discussion_r228749168 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -146,7 +146,10 @@ trait CodegenSupport extends SparkPlan { if (outputVars != null) { assert(outputVars.length == output.length) // outputVars will be used to generate the code for UnsafeRow, so we should copy them -outputVars.map(_.copy()) +outputVars.map(_.copy()) match { --- End diff -- What in `consume` is relying on a side effect of traversing the `outputVars`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22789: [SPARK-25767][SQL] Fix lazily evaluated stream of...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22789#discussion_r228748979 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -319,4 +319,15 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { assert(df.limit(1).collect() === Array(Row("bat", 8.0))) } } + + test("SPARK-25767: Lazy evaluated stream of expressions handled correctly") { --- End diff -- Even without this patch this test would pass (I just tried it on master). A stream always evaluates its first element, so you probably need to add another key here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22789: [SPARK-25767][SQL] Fix lazily evaluated stream of expres...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22789 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22822: [SPARK-25678] Requesting feedback regarding a prototype ...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22822 @UtkarshMe well there is signal in the lack of responsiveness. Adding and maintaining cluster managers has proven to be quite painful, case and point is the lack of love that Mesos is receiving. I don't really see a way forward here unless there is strong consensus in the community. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22822: [SPARK-25678] Requesting feedback regarding a prototype ...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22822 @UtkarshMe you should reach out to the spark dev list about this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22817: [SPARK-25816][SQL] ResolveReferences should work bottom-...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22817 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22817: [SPARK-25816][SQL] ResolveReferences should work bottom-...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22817 @peter-toth what are you trying to fix here? Could you add this to the PR description? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22576: [SPARK-25560][SQL] Allow FunctionInjection in Spa...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22576#discussion_r226623886 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala --- @@ -168,4 +173,21 @@ class SparkSessionExtensions { def injectParser(builder: ParserBuilder): Unit = { parserBuilders += builder } + + private[this] val injectedFunctions = mutable.Buffer.empty[FunctionDescription] + + private[sql] def registerFunctions(functionRegistry: FunctionRegistry) = { +for ((name, expressionInfo, function) <- injectedFunctions) { --- End diff -- ð --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22576: [SPARK-25560][SQL] Allow FunctionInjection in SparkExten...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22576 @RussellSpitzer I am merging this, can you address my comment in a follow up? Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22576: [SPARK-25560][SQL] Allow FunctionInjection in Spa...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22576#discussion_r226571338 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala --- @@ -168,4 +173,21 @@ class SparkSessionExtensions { def injectParser(builder: ParserBuilder): Unit = { parserBuilders += builder } + + private[this] val injectedFunctions = mutable.Buffer.empty[FunctionDescription] + + private[sql] def registerFunctions(functionRegistry: FunctionRegistry) = { +for ((name, expressionInfo, function) <- injectedFunctions) { --- End diff -- Can you move the stuff that changes the `FunctionRegistry` into the `BaseSessionStateBuilder` and just make this return the `Seq[FunctionDescription]`? The return type of this function a `FunctionRegistry` sort of implies that you are getting back a new registry instead of a mutated one. If we are mutating then I prefer to do that in the BaseSessionBuilder so it is obvious that this is safe to do because we mutating a clone. It also makes this code more inline with the rest of the extension class (not mutating). Sorry for the late change of heart. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22712: [SPARK-25724] Add sorting functionality in MapTyp...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22712#discussion_r224957118 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala --- @@ -73,6 +74,90 @@ case class MapType( override private[spark] def existsRecursively(f: (DataType) => Boolean): Boolean = { f(this) || keyType.existsRecursively(f) || valueType.existsRecursively(f) } + + private[this] class OrderedWrapper { +var isOrdered: Boolean = false --- End diff -- I prefer not to make this mutable if we can. That can be a source of some pretty weird errors if we move from an unordered to an ordered map. Why do you need this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22696: [SPARK-25708][SQL] HAVING without GROUP BY means ...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22696#discussion_r224590474 --- Diff: docs/sql-programming-guide.md --- @@ -1894,6 +1894,8 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In PySpark, when creating a `SparkSession` with `SparkSession.builder.getOrCreate()`, if there is an existing `SparkContext`, the builder was trying to update the `SparkConf` of the existing `SparkContext` with configurations specified to the builder, but the `SparkContext` is shared by all `SparkSession`s, so we should not update them. Since 3.0, the builder come to not update the configurations. This is the same behavior as Java/Scala API in 2.3 and above. If you want to update them, you need to update them prior to creating a `SparkSession`. + - In Spark version 2.4 and earlier, HAVING without GROUP BY is treated as WHERE. This means, `SELECT 1 FROM range(10) HAVING true` is executed as `SELECT 1 FROM range(10) WHERE true` and returns 10 rows. This violates SQL standard, and has been fixed in Spark 3.0. Since Spark 3.0, HAVING without GROUP BY is treated as a global aggregate, which means `SELECT 1 FROM range(10) HAVING true` will return only one row. --- End diff -- You will need to feature flag it if you port it to 2.4. People might rely on its current behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22696: [SPARK-25708][SQL] HAVING without GROUP BY means global ...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22696 I added the release-notes label to the JIRA ticket. I am not sure if there is a migration-guide label. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22576: [SPARK-25560][SQL] Allow FunctionInjection in Spa...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22576#discussion_r224366907 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala --- @@ -168,4 +173,22 @@ class SparkSessionExtensions { def injectParser(builder: ParserBuilder): Unit = { parserBuilders += builder } + + private[this] val injectedFunctions = --- End diff -- NIT: no new line? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22576: [SPARK-25560][SQL] Allow FunctionInjection in Spa...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22576#discussion_r224366774 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala --- @@ -95,7 +95,8 @@ abstract class BaseSessionStateBuilder( * This either gets cloned from a pre-existing version or cloned from the built-in registry. */ protected lazy val functionRegistry: FunctionRegistry = { - parentState.map(_.functionRegistry).getOrElse(FunctionRegistry.builtin).clone() +parentState.map(_.functionRegistry.clone()) + .getOrElse{extensions.registerFunctions(FunctionRegistry.builtin.clone())} --- End diff -- Use parenthesis instead of curly braces? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22576: [SPARK-25560][SQL] Allow FunctionInjection in Spa...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22576#discussion_r224364692 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala --- @@ -168,4 +173,22 @@ class SparkSessionExtensions { def injectParser(builder: ParserBuilder): Unit = { parserBuilders += builder } + + private[this] val injectedFunctions = +mutable.Buffer.empty[FunctionDescription] + +private[sql] def registerFunctions(functionRegistry: FunctionRegistry) = { --- End diff -- Can you fix the indentation for the next 14 lines? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22429#discussion_r223983702 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala --- @@ -189,23 +192,34 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { """.stripMargin.trim } + private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = +try f(writer) catch { case e: AnalysisException => writer.write(e.toString) } --- End diff -- Please use multiple lines here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22429#discussion_r223983537 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala --- @@ -167,6 +172,58 @@ package object util { builder.toString() } + /** + * The performance overhead of creating and logging strings for wide schemas can be large. To + * limit the impact, we bound the number of fields to include by default. This can be overridden + * by setting the 'spark.debug.maxToStringFields' conf in SparkEnv or by settings the SQL config + * `spark.sql.debug.maxToStringFields`. + */ + private[spark] def maxNumToStringFields: Int = { +val legacyLimit = if (SparkEnv.get != null) { + SparkEnv.get.conf.get(config.MAX_TO_STRING_FIELDS) +} else { + config.MAX_TO_STRING_FIELDS.defaultValue.get +} +val sqlConfLimit = SQLConf.get.maxToStringFields + +Math.max(sqlConfLimit, legacyLimit) + } + + /** Whether we have warned about plan string truncation yet. */ + private val truncationWarningPrinted = new AtomicBoolean(false) + + /** + * Format a sequence with semantics similar to calling .mkString(). Any elements beyond + * maxNumToStringFields will be dropped and replaced by a "... N more fields" placeholder. + * + * @return the trimmed and formatted string. + */ + def truncatedString[T]( + seq: Seq[T], + start: String, + sep: String, + end: String, + maxFields: Option[Int]): String = { +val maxNumFields = maxFields.getOrElse(maxNumToStringFields) --- End diff -- You should document this behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22429#discussion_r223982046 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala --- @@ -167,6 +172,58 @@ package object util { builder.toString() } + /** + * The performance overhead of creating and logging strings for wide schemas can be large. To + * limit the impact, we bound the number of fields to include by default. This can be overridden + * by setting the 'spark.debug.maxToStringFields' conf in SparkEnv or by settings the SQL config + * `spark.sql.debug.maxToStringFields`. + */ + private[spark] def maxNumToStringFields: Int = { +val legacyLimit = if (SparkEnv.get != null) { --- End diff -- Just for context why do you want to retain the legacy behavior? It is probably fine to break it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22429#discussion_r223980665 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala --- @@ -455,21 +457,37 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { }.mkString(", ") /** ONE line description of this node. */ - def simpleString: String = s"$nodeName $argString".trim + def simpleString(maxFields: Option[Int]): String = { --- End diff -- Please document the `maxFields` parameter. I am especially interested in what `None` represents here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22429#discussion_r223979931 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala --- @@ -52,7 +52,7 @@ case class CatalystDataToAvro(child: Expression) extends UnaryExpression { out.toByteArray } - override def simpleString: String = { + override def simpleString(maxFields: Option[Int]): String = { s"to_avro(${child.sql}, ${child.dataType.simpleString})" --- End diff -- Should we also pass the maxFields to `child.dataType`? For example `StructType` fields are truncated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22429#discussion_r223979392 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -633,4 +633,14 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val MAX_TO_STRING_FIELDS = +ConfigBuilder("spark.debug.maxToStringFields") + .internal() + .doc("Maximum number of fields of sequence-like entries that can be converted to strings " + --- End diff -- What is a sequence like entry? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22429: [SPARK-25440][SQL] Dumping query execution info to a fil...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22429 @boy-uber the thing you are suggesting is a pretty big undertaking and beyond the scope of this PR. If you are going to add structured plans to the explain output, you probably also want some guarantees about stability over multiple spark versions and you probably also want to be able to reconstruct the plan. Neither is easy. If you want to have this in Spark, then I suggest sending a proposal to the dev list. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22674#discussion_r223886858 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala --- @@ -75,95 +76,74 @@ trait QueryExecutionListener { */ @Experimental @InterfaceStability.Evolving -class ExecutionListenerManager private extends Logging { - - private[sql] def this(conf: SparkConf) = { -this() +// The `session` is used to indicate which session carries this listener manager, and we only +// catch SQL executions which are launched by the same session. +// The `loadExtensions` flag is used to indicate whether we should load the pre-defined, +// user-specified listeners during construction. We should not do it when cloning this listener +// manager, as we will copy all listeners to the cloned listener manager. +class ExecutionListenerManager private[sql](session: SparkSession, loadExtensions: Boolean) + extends SparkListener with Logging { + + private[this] val listeners = new CopyOnWriteArrayList[QueryExecutionListener] + + if (loadExtensions) { +val conf = session.sparkContext.conf conf.get(QUERY_EXECUTION_LISTENERS).foreach { classNames => Utils.loadExtensions(classOf[QueryExecutionListener], classNames, conf).foreach(register) } } + session.sparkContext.listenerBus.addToSharedQueue(this) + /** * Registers the specified [[QueryExecutionListener]]. */ @DeveloperApi - def register(listener: QueryExecutionListener): Unit = writeLock { -listeners += listener + def register(listener: QueryExecutionListener): Unit = { +listeners.add(listener) } /** * Unregisters the specified [[QueryExecutionListener]]. */ @DeveloperApi - def unregister(listener: QueryExecutionListener): Unit = writeLock { -listeners -= listener + def unregister(listener: QueryExecutionListener): Unit = { +listeners.remove(listener) } /** * Removes all the registered [[QueryExecutionListener]]. */ @DeveloperApi - def clear(): Unit = writeLock { + def clear(): Unit = { listeners.clear() } /** * Get an identical copy of this listener manager. */ - @DeveloperApi - override def clone(): ExecutionListenerManager = writeLock { -val newListenerManager = new ExecutionListenerManager -listeners.foreach(newListenerManager.register) + private[sql] def clone(session: SparkSession): ExecutionListenerManager = { +val newListenerManager = new ExecutionListenerManager(session, loadExtensions = false) +listeners.iterator().asScala.foreach(newListenerManager.register) newListenerManager } - private[sql] def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { -readLock { - withErrorHandling { listener => -listener.onSuccess(funcName, qe, duration) + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { +case e: SparkListenerSQLExecutionEnd if shouldCatchEvent(e) => + val funcName = e.executionName.get + e.executionFailure match { +case Some(ex) => + listeners.iterator().asScala.foreach(_.onFailure(funcName, e.qe, ex)) --- End diff -- This is a bit of high level thought, you could consider making the calling event queue responsible for the dispatch of these events. That way you can leverage any improvement to the underlying event bus. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22674#discussion_r223885742 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala --- @@ -75,95 +76,74 @@ trait QueryExecutionListener { */ @Experimental @InterfaceStability.Evolving -class ExecutionListenerManager private extends Logging { - - private[sql] def this(conf: SparkConf) = { -this() +// The `session` is used to indicate which session carries this listener manager, and we only +// catch SQL executions which are launched by the same session. +// The `loadExtensions` flag is used to indicate whether we should load the pre-defined, +// user-specified listeners during construction. We should not do it when cloning this listener +// manager, as we will copy all listeners to the cloned listener manager. +class ExecutionListenerManager private[sql](session: SparkSession, loadExtensions: Boolean) + extends SparkListener with Logging { + + private[this] val listeners = new CopyOnWriteArrayList[QueryExecutionListener] + + if (loadExtensions) { +val conf = session.sparkContext.conf conf.get(QUERY_EXECUTION_LISTENERS).foreach { classNames => Utils.loadExtensions(classOf[QueryExecutionListener], classNames, conf).foreach(register) } } + session.sparkContext.listenerBus.addToSharedQueue(this) + /** * Registers the specified [[QueryExecutionListener]]. */ @DeveloperApi - def register(listener: QueryExecutionListener): Unit = writeLock { -listeners += listener + def register(listener: QueryExecutionListener): Unit = { +listeners.add(listener) } /** * Unregisters the specified [[QueryExecutionListener]]. */ @DeveloperApi - def unregister(listener: QueryExecutionListener): Unit = writeLock { -listeners -= listener + def unregister(listener: QueryExecutionListener): Unit = { +listeners.remove(listener) } /** * Removes all the registered [[QueryExecutionListener]]. */ @DeveloperApi - def clear(): Unit = writeLock { + def clear(): Unit = { listeners.clear() } /** * Get an identical copy of this listener manager. */ - @DeveloperApi - override def clone(): ExecutionListenerManager = writeLock { -val newListenerManager = new ExecutionListenerManager -listeners.foreach(newListenerManager.register) + private[sql] def clone(session: SparkSession): ExecutionListenerManager = { +val newListenerManager = new ExecutionListenerManager(session, loadExtensions = false) +listeners.iterator().asScala.foreach(newListenerManager.register) newListenerManager } - private[sql] def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { -readLock { - withErrorHandling { listener => -listener.onSuccess(funcName, qe, duration) + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { +case e: SparkListenerSQLExecutionEnd if shouldCatchEvent(e) => + val funcName = e.executionName.get + e.executionFailure match { +case Some(ex) => + listeners.iterator().asScala.foreach(_.onFailure(funcName, e.qe, ex)) +case _ => + listeners.iterator().asScala.foreach(_.onSuccess(funcName, e.qe, e.duration)) } -} - } - private[sql] def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { -readLock { - withErrorHandling { listener => -listener.onFailure(funcName, qe, exception) - } -} +case _ => // Ignore } - private[this] val listeners = ListBuffer.empty[QueryExecutionListener] - - /** A lock to prevent updating the list of listeners while we are traversing through them. */ - private[this] val lock = new ReentrantReadWriteLock() - - private def withErrorHandling(f: QueryExecutionListener => Unit): Unit = { -for (listener <- listeners) { - try { -f(listener) - } catch { -case NonFatal(e) => logWarning("Error executing query execution listener", e) - } -} - } - - /** Acquires a read lock on the cache for the duration of `f`. */ - private def readLock[A](f: => A): A = { -val rl = lock.readLock() -rl.lock() -try f finally { - rl.unlock() -} - } - - /** Acquires a write lock on the cache for the duration of `f`. */ - private def writeLock[A](f: => A): A = { -val wl = lo
[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22674#discussion_r223873662 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala --- @@ -75,95 +76,74 @@ trait QueryExecutionListener { */ @Experimental @InterfaceStability.Evolving -class ExecutionListenerManager private extends Logging { - - private[sql] def this(conf: SparkConf) = { -this() +// The `session` is used to indicate which session carries this listener manager, and we only --- End diff -- Why is this not a class doc? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22674#discussion_r223873406 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala --- @@ -39,7 +39,14 @@ case class SparkListenerSQLExecutionStart( @DeveloperApi case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) - extends SparkListenerEvent + extends SparkListenerEvent { + + @JsonIgnore private[sql] var executionName: Option[String] = None --- End diff -- Why do we want to be backwards compatible here? SHS? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16677 1. `numOutputs` is the number or records 2. 8 bytes per `MapStatus`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22429: [SPARK-25440][SQL] Dumping query execution info to a fil...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22429 @MaxGekk please just modify simpleString it is internal API for this reason. @rednaxelafx rope approach has the benefit that it does not create a ton of intermediate buffers. We could do that as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22429#discussion_r217928428 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala --- @@ -250,5 +254,36 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { def codegenToSeq(): Seq[(String, String)] = { org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan) } + +/** + * Dumps debug information about query execution into the specified file. + */ +def toFile(path: String): Unit = { + val maxFields = SparkEnv.get.conf.getInt(Utils.MAX_TO_STRING_FIELDS, +Utils.DEFAULT_MAX_TO_STRING_FIELDS) + val filePath = new Path(path) + val fs = FileSystem.get(filePath.toUri, sparkSession.sessionState.newHadoopConf()) + val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath))) + + try { +SparkEnv.get.conf.set(Utils.MAX_TO_STRING_FIELDS, Int.MaxValue.toString) +writer.write("== Parsed Logical Plan ==\n") --- End diff -- Can we combine this entire block with what is done in the `toString()` method? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22429#discussion_r217928334 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala --- @@ -250,5 +254,36 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { def codegenToSeq(): Seq[(String, String)] = { org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan) } + +/** + * Dumps debug information about query execution into the specified file. + */ +def toFile(path: String): Unit = { + val maxFields = SparkEnv.get.conf.getInt(Utils.MAX_TO_STRING_FIELDS, +Utils.DEFAULT_MAX_TO_STRING_FIELDS) + val filePath = new Path(path) + val fs = FileSystem.get(filePath.toUri, sparkSession.sessionState.newHadoopConf()) + val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath))) + + try { +SparkEnv.get.conf.set(Utils.MAX_TO_STRING_FIELDS, Int.MaxValue.toString) --- End diff -- It is generally a bad idea to change this conf as people expect that it is immutable. Also this change has some far reaching consequences, others will now also be exposed to a different `Utils.MAX_TO_STRING_FIELDS` value when calling `explain()`. Can you please just pass the parameter down the tree? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22429#discussion_r217928262 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala --- @@ -469,7 +470,17 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { def treeString: String = treeString(verbose = true) def treeString(verbose: Boolean, addSuffix: Boolean = false): String = { -generateTreeString(0, Nil, new StringBuilder, verbose = verbose, addSuffix = addSuffix).toString +val baos = new ByteArrayOutputStream() --- End diff -- What is the benefit of using this instead of using a `java.io.StringWriter` or `org.apache.commons.io.output.StringBuilderWriter`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22429#discussion_r217915071 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala --- @@ -250,5 +253,35 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { def codegenToSeq(): Seq[(String, String)] = { org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan) } + +/** + * Dumps debug information about query execution into the specified file. + */ +def toFile(path: String): Unit = { + val filePath = new Path(path) + val fs = FileSystem.get(filePath.toUri, sparkSession.sparkContext.hadoopConfiguration) --- End diff -- Why use the hadoop configuration of the `SparkContext`? It is probably better to use the one that `sparkSession.sessionState.newHadoopConf()` provides. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22429#discussion_r217913739 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala --- @@ -469,7 +470,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { def treeString: String = treeString(verbose = true) def treeString(verbose: Boolean, addSuffix: Boolean = false): String = { -generateTreeString(0, Nil, new StringBuilder, verbose = verbose, addSuffix = addSuffix).toString +val bos = new ByteArrayOutputStream() +treeString(bos, verbose, addSuffix) +bos.toString + } + + def treeString(os: OutputStream, verbose: Boolean, addSuffix: Boolean): Unit = { --- End diff -- Can you please use a `java.io.Writer` or something else you can directly write a string to? You are now using `getBytes()` everywhere and that is far from cheap because it needs to encode the chars and allocate a byte array for each string. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r217841164 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp + + +/** + * A [[MutableProjection]] that is calculated by calling `eval` on each of the specified + * expressions. + * + * @param expressions a sequence of expressions that determine the value of each column of the + *output row. + */ +class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection { + def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = +this(toBoundExprs(expressions, inputSchema)) + + private[this] val buffer = new Array[Any](expressions.size) + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + private[this] val validExprs = expressions.zipWithIndex.filter { +case (NoOp, _) => false +case _ => true + } + private[this] var mutableRow: InternalRow = new GenericInternalRow(expressions.size) + def currentValue: InternalRow = mutableRow + + override def target(row: InternalRow): MutableProjection = { +mutableRow = row +this + } + + override def apply(input: InternalRow): InternalRow = { +validExprs.foreach { case (expr, i) => --- End diff -- Can you please use the old code? That should be much more performant that this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22417: [SPARK-25426][SQL] Remove the duplicate fallback logic i...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22417 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22344#discussion_r217070658 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -68,22 +68,42 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ReturnAnswer(rootPlan) => rootPlan match { -case Limit(IntegerLiteral(limit), Sort(order, true, child)) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil -case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil +case Limit(IntegerLiteral(limit), s@Sort(order, true, child)) => + if (limit < conf.topKSortFallbackThreshold) { --- End diff -- @viirya sorry to be a little late to the party. This pattern is repeated 4x can you just most into a helper function? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22205: [SPARK-25212][SQL] Support Filter in ConvertToLoc...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22205#discussion_r213124828 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1349,6 +1353,12 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] { case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) => LocalRelation(output, data.take(limit), isStreaming) + +case Filter(condition, LocalRelation(output, data, isStreaming)) +if !hasUnevaluableExpr(condition) => --- End diff -- I suppose it is fine in this case. The only thing is that it violates the contract of the optimizer: it should not change the results of a query. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22205: [SPARK-25212][SQL] Support Filter in ConvertToLocalRelat...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22205 @gatorsmile what are you afraid of exactly? We could check which tests are affected. Also do you want to disable this for testing only? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22239: [SPARK-19355][SQL][Followup] Remove the child.outputPart...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22239 Shall we rename it to: **[SPARK-19355][SQL][Followup] Remove the child.outputOrdering check in global limit**? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22239: [SPARK-19355][SQL][Followup] Remove the child.outputPart...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22239 LGTM - Let's wait a little bit with merging to allow others to comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22239: [SPARK-19355][SQL][Followup] Remove the child.outputPart...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22239 Setting `spark.sql.limit.flatGlobalLimit` to `false` works for me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22239: [SPARK-19355][SQL][Followup] Remove the child.outputPart...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22239 cc @cloud-fan for a sanity check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22239: [SPARK-19355][SQL][Followup] Remove the child.outputPart...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22239 @viirya did you try to run `TakeOrderedAndProjectSuite`? I am pretty sure that will fail now ;)... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r212830045 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(childRDD) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil --- End diff -- `select * from table order by a limit 10` gets planned differently right? It should use `TakeOrderedAndProjectExec`. There is nothing in the SQL standard that mandates that a nested order by followed by a limit has to respect that ordering clause. In fact, AFAIR, the standard does not even support nested limits (they make stuff non-deterministic). If we end up supporting this, then I'd rather have an explicit flag in `GlobalLimitExec` (`orderedLimit` or something like that) and set that during planning by matching on `Limit(limit, Sort(order, true, child))`. I want the explicit flag because then we can figure out what limit is doing by looking at the physical plan. I want to explicitly check for an underlying sort to match the current `TakeOrderedAndProjectExec` semantics and to avoid weird behavior because something way down the plan has set some arbitrary ordering. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r212805707 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(childRDD) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil --- End diff -- If we remove it, we may need to feature flag it first since people may rely on the old behavior. Anyway all of this is up for debate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r212805327 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(childRDD) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil --- End diff -- @viirya dumb question, what is `child.outputOrdering` doing here? I am not entirely sure that we should guarantee that you should get the lowest elements of a dataset if you perform a limit in the middle of a query (a top level sort-limit does have this guarantee). I also don't think the SQL standard supports/mandates this. Moreover checking `child.outputOrdering` only checks the order of the partition and not the order of the frame as a whole. You should also add the `child.outputPartitioning`. I would be slightly in favor of removing the `child.outputOrdering` check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22216: [SPARK-25223][SQL] Use a map to store values for NamedLa...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22216 I think the use of global state and a thread local is far more hacky and probably is slower. The only clean solution I see here is to pass the lambda values around using the input row. I am not saying that this easy. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21899: [SPARK-24912][SQL] Don't obscure source of OOM du...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21899#discussion_r211044133 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala --- @@ -118,12 +119,20 @@ case class BroadcastExchangeExec( // SparkFatalException, which is a subclass of Exception. ThreadUtils.awaitResult // will catch this exception and re-throw the wrapped fatal throwable. case oe: OutOfMemoryError => -throw new SparkFatalException( +val sizeMessage = if (dataSize != -1) { + s"${SparkLauncher.DRIVER_MEMORY} by at least the estimated size of the " + --- End diff -- Forgive me for asking a dumb question, but where will this exception come from? The block manager? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22062#discussion_r209372979 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.lang.{Long => JLong} + +import org.mockito.Mockito.when +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.memory._ +import org.apache.spark.unsafe.Platform + +class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { + + test("nested spill should be no-op") { +val conf = new SparkConf() + .setMaster("local[1]") + .setAppName("ShuffleExternalSorterSuite") + .set("spark.testing", "true") + .set("spark.testing.memory", "1600") + .set("spark.memory.fraction", "1") +sc = new SparkContext(conf) + +val memoryManager = UnifiedMemoryManager(conf, 1) + +var shouldAllocate = false + +// Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true. +// This will trigger a nested spill and expose issues if we don't handle this case properly. +val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) { + override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = { +// ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use. +// So we leave 400 bytes for the task. +if (shouldAllocate && + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) { + val acquireExecutionMemoryMethod = +memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head + acquireExecutionMemoryMethod.invoke( +memoryManager, +JLong.valueOf( + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400), +JLong.valueOf(1L), // taskAttemptId +MemoryMode.ON_HEAP + ).asInstanceOf[java.lang.Long] +} +super.acquireExecutionMemory(required, consumer) + } +} +val taskContext = mock[TaskContext] --- End diff -- lol --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14083: [SPARK-16406][SQL] Improve performance of Logical...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/14083#discussion_r209336169 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala --- @@ -138,6 +140,88 @@ package object expressions { def indexOf(exprId: ExprId): Int = { Option(exprIdToOrdinal.get(exprId)).getOrElse(-1) } + +private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = { + m.mapValues(_.distinct).map(identity) +} + +/** Map to use for direct case insensitive attribute lookups. */ +@transient private lazy val direct: Map[String, Seq[Attribute]] = { + unique(attrs.groupBy(_.name.toLowerCase)) +} + +/** Map to use for qualified case insensitive attribute lookups. */ +@transient private val qualified: Map[(String, String), Seq[Attribute]] = { + val grouped = attrs.filter(_.qualifier.isDefined).groupBy { a => +(a.qualifier.get.toLowerCase, a.name.toLowerCase) + } + unique(grouped) +} + +/** Perform attribute resolution given a name and a resolver. */ +def resolve(nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = { + // Collect matching attributes given a name and a lookup. + def collectMatches(name: String, candidates: Option[Seq[Attribute]]): Seq[Attribute] = { +candidates.toSeq.flatMap(_.collect { + case a if resolver(a.name, name) => a.withName(name) +}) + } + + // Find matches for the given name assuming that the 1st part is a qualifier (i.e. table name, + // alias, or subquery alias) and the 2nd part is the actual name. This returns a tuple of + // matched attributes and a list of parts that are to be resolved. + // + // For example, consider an example where "a" is the table name, "b" is the column name, + // and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b", + // and the second element will be List("c"). + val matches = nameParts match { +case qualifier +: name +: nestedFields => + val key = (qualifier.toLowerCase, name.toLowerCase) + val attributes = collectMatches(name, qualified.get(key)).filter { a => +resolver(qualifier, a.qualifier.get) + } + (attributes, nestedFields) +case all => + (Nil, all) + } + + // If none of attributes match `table.column` pattern, we try to resolve it as a column. + val (candidates, nestedFields) = matches match { +case (Seq(), _) => + val name = nameParts.head + val attributes = collectMatches(name, direct.get(name.toLowerCase)) + (attributes, nameParts.tail) +case _ => matches + } + + def name = UnresolvedAttribute(nameParts).name + candidates match { +case Seq(a) if nestedFields.nonEmpty => + // One match, but we also need to extract the requested nested field. + // The foldLeft adds ExtractValues for every remaining parts of the identifier, + // and aliased it with the last part of the name. + // For example, consider "a.b.c", where "a" is resolved to an existing attribute. + // Then this will add ExtractValue("c", ExtractValue("b", a)), and alias the final + // expression as "c". + val fieldExprs = nestedFields.foldLeft(a: Expression) { (e, name) => +ExtractValue(e, Literal(name), resolver) --- End diff -- @heuermh I have not filed the issue for this. Do you want to work on this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22062#discussion_r209292284 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.lang.{Long => JLong} + +import org.mockito.Mockito.when +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.memory._ +import org.apache.spark.unsafe.Platform + +class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { + + test("nested spill should be no-op") { +val conf = new SparkConf() + .setMaster("local[1]") + .setAppName("ShuffleExternalSorterSuite") + .set("spark.testing", "true") + .set("spark.testing.memory", "1600") + .set("spark.memory.fraction", "1") +sc = new SparkContext(conf) + +val memoryManager = UnifiedMemoryManager(conf, 1) + +var shouldAllocate = false + +// Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true. +// This will trigger a nested spill and expose issues if we don't handle this case properly. +val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) { + override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = { +// ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use. +// So we leave 400 bytes for the task. +if (shouldAllocate && + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) { + val acquireExecutionMemoryMethod = +memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head + acquireExecutionMemoryMethod.invoke( +memoryManager, +JLong.valueOf( + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400), +JLong.valueOf(1L), // taskAttemptId +MemoryMode.ON_HEAP + ).asInstanceOf[java.lang.Long] +} +super.acquireExecutionMemory(required, consumer) + } +} +val taskContext = mock[TaskContext] --- End diff -- Do we need mockito here? We can also create a `TaskContextImpl` by hand right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22062#discussion_r209291439 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.lang.{Long => JLong} + +import org.mockito.Mockito.when +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.memory._ +import org.apache.spark.unsafe.Platform + +class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { + + test("nested spill should be no-op") { +val conf = new SparkConf() + .setMaster("local[1]") + .setAppName("ShuffleExternalSorterSuite") + .set("spark.testing", "true") + .set("spark.testing.memory", "1600") + .set("spark.memory.fraction", "1") +sc = new SparkContext(conf) + +val memoryManager = UnifiedMemoryManager(conf, 1) + +var shouldAllocate = false + +// Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true. +// This will trigger a nested spill and expose issues if we don't handle this case properly. +val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) { + override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = { +// ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use. +// So we leave 400 bytes for the task. +if (shouldAllocate && + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) { + val acquireExecutionMemoryMethod = +memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head + acquireExecutionMemoryMethod.invoke( +memoryManager, +JLong.valueOf( + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400), +JLong.valueOf(1L), // taskAttemptId +MemoryMode.ON_HEAP + ).asInstanceOf[java.lang.Long] +} +super.acquireExecutionMemory(required, consumer) + } +} +val taskContext = mock[TaskContext] +val taskMetrics = new TaskMetrics +when(taskContext.taskMetrics()).thenReturn(taskMetrics) +val sorter = new ShuffleExternalSorter( + taskMemoryManager, + sc.env.blockManager, + taskContext, + 100, // initialSize - This will require ShuffleInMemorySorter to acquire at least 800 bytes + 1, // numPartitions + conf, + new ShuffleWriteMetrics) +val inMemSorter = { + val field = sorter.getClass.getDeclaredField("inMemSorter") + field.setAccessible(true) + field.get(sorter).asInstanceOf[ShuffleInMemorySorter] +} +// Allocate memory to make the next "insertRecord" call triggers a spill. +val bytes = new Array[Byte](1) +while (inMemSorter.hasSpaceForAnotherRecord) { --- End diff -- Access to the `hasSpaceForAnotherRecord` is the only reason why we need reflection right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22062#discussion_r209262151 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java --- @@ -94,12 +94,20 @@ public int numRecords() { } public void reset() { +// Reset `pos` here so that `spill` triggered by the below `allocateArray` will be no-op. +pos = 0; --- End diff -- For my understanding: this is enough to fix the actual issue here right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/21369 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22064: [MINOR][BUILD] Add ECCN notice required by http://www.ap...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22064 LGTM FWIW --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16677 Merging to master. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22057: [SPARK-25077][SQL] Delete unused variable in WindowExec
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22057 So I guess LGTM. I am generally not a fan of these aesthetic changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16677 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22035: [SPARK-23911][SQL][FOLLOW-UP] Fix examples of aggregate ...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22035 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r208273712 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.DataType + +/** + * Resolve a higher order functions from the catalog. This is different from regular function + * resolution because lambda functions can only be resolved after the function has been resolved; + * so we need to resolve higher order function when all children are either resolved or a lambda + * function. + */ +case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { +case q: LogicalPlan => + q.transformExpressions { +case u @ UnresolvedFunction(fn, children, false) +if hasLambdaAndResolvedArguments(children) => + withPosition(u) { +catalog.lookupFunction(fn, children) match { + case func: HigherOrderFunction => func + case other => other.failAnalysis( +"A lambda function should only be used in a higher order function. However, " + + s"its class is ${other.getClass.getCanonicalName}, which is not a " + + s"higher order function.") +} + } + } + } + + /** + * Check if the arguments of a function are either resolved or a lambda function. + */ + private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = { +val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction]) +lambdas.nonEmpty && others.forall(_.resolved) + } +} + +/** + * Resolve the lambda variables exposed by a higher order functions. + * + * This rule works in two steps: + * [1]. Bind the anonymous variables exposed by the higher order function to the lambda function's + * arguments; this creates named and typed lambda variables. The argument names are checked + * for duplicates and the number of arguments are checked during this step. + * [2]. Resolve the used lambda variables used in the lambda function's function expression tree. + * Note that we allow the use of variables from outside the current lambda, this can either + * be a lambda function defined in an outer scope, or a attribute in produced by the plan's + * child. If names are duplicate, the name defined in the most inner scope is used. + */ +case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] { + + type LambdaVariableMap = Map[String, NamedExpression] + + private val canonicalizer = { +if (!conf.caseSensitiveAnalysis) { + s: String => s.toLowerCase +} else { + s: String => s +} + } + + override def apply(plan: LogicalPlan): LogicalPlan = { +plan.resolveOperators { + case q: LogicalPlan => +q.mapExpressions(resolve(_, Map.empty)) +} + } + + /** + * Create a bound lambda function by binding the arguments of a lambda function to the given + * partial arguments (dataType and nullability only). If the expression happens to be an already + * bound lambda function then we assume it has been bound to the correct arguments and do + * nothing. This
[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21258#discussion_r208199133 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala --- @@ -235,6 +235,69 @@ case class CreateMap(children: Seq[Expression]) extends Expression { override def prettyName: String = "map" } +/** + * Returns a catalyst Map containing the two arrays in children expressions as keys and values. + */ +@ExpressionDescription( + usage = """ +_FUNC_(keys, values) - Creates a map with a pair of the given key/value arrays. All elements + in keys should not be null""", + examples = """ +Examples: + > SELECT _FUNC_([1.0, 3.0], ['2', '4']); + {1.0:"2",3.0:"4"} + """, since = "2.4.0") +case class CreateMapFromArray(left: Expression, right: Expression) +extends BinaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (ArrayType(_, cn), ArrayType(_, _)) => +if (!cn) { + TypeCheckResult.TypeCheckSuccess +} else { + TypeCheckResult.TypeCheckFailure("All of the given keys should be non-null") +} + case _ => +TypeCheckResult.TypeCheckFailure("The given two arguments should be an array") +} + } + + override def dataType: DataType = { +MapType( + keyType = left.dataType.asInstanceOf[ArrayType].elementType, + valueType = right.dataType.asInstanceOf[ArrayType].elementType, + valueContainsNull = left.dataType.asInstanceOf[ArrayType].containsNull) + } + + override def nullable: Boolean = false + + override def nullSafeEval(keyArray: Any, valueArray: Any): Any = { +val keyArrayData = keyArray.asInstanceOf[ArrayData] --- End diff -- I would like to err on the safe side here. `CreateMap` should be fixed IMO. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r208136330 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -365,3 +365,69 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Keys in a map using the transform_keys function. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Transforms elements in a map using the function.", + examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1); + map(array(2, 3, 4), array(1, 2, 3)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v); + map(array(2, 4, 6), array(1, 2, 3)) + """, + since = "2.4.0") +case class TransformKeys( +input: Expression, +function: Expression) + extends ArrayBasedHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = input.nullable + + override def dataType: DataType = { +val valueType = input.dataType.asInstanceOf[MapType].valueType +MapType(function.dataType, valueType, input.nullable) + } + + override def inputTypes: Seq[AbstractDataType] = Seq(MapType, expectingFunctionType) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): + TransformKeys = { +val (keyElementType, valueElementType, containsNull) = input.dataType match { + case MapType(keyType, valueType, containsNullValue) => +(keyType, valueType, containsNullValue) + case _ => +val MapType(keyType, valueType, containsNullValue) = MapType.defaultConcreteType +(keyType, valueType, containsNullValue) +} +copy(function = f(function, (keyElementType, false) :: (valueElementType, containsNull) :: Nil)) + } + + @transient lazy val (keyVar, valueVar) = { +val LambdaFunction( +_, (keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: Nil, _) = function +(keyVar, valueVar) + } + + override def eval(input: InternalRow): Any = { +val arr = this.input.eval(input).asInstanceOf[MapData] +if (arr == null) { + null +} else { + val f = functionForEval + val resultKeys = new GenericArrayData(new Array[Any](arr.numElements)) + var i = 0 + while (i < arr.numElements) { +keyVar.value.set(arr.keyArray().get(i, keyVar.dataType)) +valueVar.value.set(arr.valueArray().get(i, valueVar.dataType)) +resultKeys.update(i, f.eval(input)) --- End diff -- This assumes that the transformation will return a unique key right? If it doesn't you'll break the map semantics. For example: `map_key(some_map, (k, v) -> 0)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22013: [SPARK-23939][SQL] Add transform_keys function
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22013 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22012: [SPARK-25036][SQL] Should compare ExprValue.isNull with ...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/22012 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21986: [SPARK-23937][SQL] Add map_filter SQL function
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21986#discussion_r207954320 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -123,7 +125,10 @@ trait HigherOrderFunction extends Expression { } } -trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes { +/** + * Trait for functions having as input one argument and one function. + */ +trait UnaryHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes { --- End diff -- We use the term `Unary` a lot and this is different from the other uses. The name should convey a HigherOrderFunction that only uses a single (lambda) function right? The only thing I can come up with is `SingleHigherOrderFunction`. `Simple` would probably also be fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21982: [SPARK-23911][SQL] Add aggregate function.
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/21982 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21965: [SPARK-23909][SQL] Add filter function.
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21965#discussion_r207480086 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -210,3 +219,54 @@ case class ArrayTransform( override def prettyName: String = "transform" } + +/** + * Filters the input array using the given lambda function. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Filters the input array using the given predicate.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), x -> x % 2 == 1); + array(1, 3) + """, + since = "2.4.0") +case class ArrayFilter( +input: Expression, +function: Expression) + extends ArrayBasedHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = input.nullable + + override def dataType: DataType = input.dataType + + override def expectingFunctionType: AbstractDataType = BooleanType + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArrayFilter = { +val elem = ArrayBasedHigherOrderFunction.elementArgumentType(input.dataType) +copy(function = f(function, elem :: Nil)) + } + + @transient lazy val LambdaFunction(_, Seq(elementVar: NamedLambdaVariable), _) = function + + override def eval(input: InternalRow): Any = { +val arr = this.input.eval(input).asInstanceOf[ArrayData] +if (arr == null) { + null +} else { + val f = functionForEval + val buffer = new mutable.ArrayBuffer[Any] + var i = 0 + while (i < arr.numElements) { +elementVar.value.set(arr.get(i, elementVar.dataType)) +if (f.eval(input).asInstanceOf[Boolean]) { + buffer += elementVar.value.get +} +i += 1 + } + new GenericArrayData(buffer) +} + } + + override def prettyName: String = "filter" --- End diff -- Is filter too generic? wdyt? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21965: [SPARK-23909][SQL] Add filter function.
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21965#discussion_r207479758 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -210,3 +219,54 @@ case class ArrayTransform( override def prettyName: String = "transform" } + +/** + * Filters the input array using the given lambda function. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Filters the input array using the given predicate.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), x -> x % 2 == 1); + array(1, 3) + """, + since = "2.4.0") +case class ArrayFilter( +input: Expression, +function: Expression) + extends ArrayBasedHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = input.nullable + + override def dataType: DataType = input.dataType + + override def expectingFunctionType: AbstractDataType = BooleanType + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArrayFilter = { +val elem = ArrayBasedHigherOrderFunction.elementArgumentType(input.dataType) +copy(function = f(function, elem :: Nil)) + } + + @transient lazy val LambdaFunction(_, Seq(elementVar: NamedLambdaVariable), _) = function + + override def eval(input: InternalRow): Any = { +val arr = this.input.eval(input).asInstanceOf[ArrayData] +if (arr == null) { + null +} else { + val f = functionForEval + val buffer = new mutable.ArrayBuffer[Any] --- End diff -- I am wondering if we should use the buffer builder with a size hint here? Or, alternatively manage the array ourself. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r207171941 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), --- End diff -- Yeah, that makes sense. Let's leave it for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r207158636 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), --- End diff -- You did? Could you elaborate? There shouldn't be any current access here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r207145478 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), --- End diff -- You are only using the `AtomicReference ` as an container right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r207141916 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), +exprId: ExprId = NamedExpression.newExprId) + extends LeafExpression + with NamedExpression + with CodegenFallback { + + override def qualifier: Option[String] = None + + override def newInstance(): NamedExpression = +copy(value = new AtomicReference(), exprId = NamedExpression.newExprId) + + override def toAttribute: Attribute = { +AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None) + } + + override def eval(input: InternalRow): Any = value.get + + override def toString: String = s"lambda $name#${exprId.id}$typeSuffix" + + override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}" +} + +/** + * A lambda function and its arguments. A lambda function can be hidden when a user wants to + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function + * and its variables are then only used for internal bookkeeping within the higher order function. + */ +case class LambdaFunction( +function: Expression, +arguments: Seq[NamedExpression], +hidden: Boolean = false) + extends Expression with CodegenFallback { + + override def children: Seq[Expression] = function +: arguments + override def dataType: DataType = function.dataType + override def nullable: Boolean = function.nullable + + lazy val bound: Boolean = arguments.forall(_.resolved) + + override def eval(input: InternalRow): Any = function.eval(input) +} + +/** + * A higher order function takes one or more (lambda) functions and applies these to some objects. + * The function produces a number of variables which can be consumed by some lambda function. + */ +trait HigherOrderFunction extends Expression { + + override def children: Seq[Expression] = inputs ++ functions + + /** + * Inputs to the higher ordered function. + */ + def inputs: Seq[Expression] + + /** + * All inputs have been resolved. This means that the types and nullabilty of (most of) the + * lambda function arguments is known, and that we can start binding the lambda functions. + */ + lazy val inputResolved: Boolean = inputs.forall(_.resolved) + + /** + * Functions applied by the higher order function. + */ + def functions: Seq[Expression] + + /** + * All inputs must be resolved and all functions must be resolved lambda functions. + */ + override lazy val resolved: Boolean = inputResolved && functions.forall { +case l: LambdaFunction => l.resolved +case _ => false + } + + /** + * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The + * bind function takes the potential lambda and it's (partial) arguments and converts this into + * a bound lambda function. + */ + def bind(f: (Expression, Seq[(Da
[GitHub] spark issue #21930: [SPARK-14540][Core] Fix remaining major issues for Scala...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/21930 Yeah I would not worry about it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r206540368 --- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala --- @@ -123,7 +123,7 @@ abstract class TaskContext extends Serializable { * * Exceptions thrown by the listener will result in failure of the task. */ - def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = { + def addTaskCompletionListener[U](f: (TaskContext) => U): TaskContext = { --- End diff -- Do we need to change this? I don't think it is a problem binary compatibility wise, but it seems a but weird since we don't use the result of the function. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r205967625 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3968,3 +3964,242 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike +with ComplexTypeMergingExpression { + override def dataType: DataType = { +dataTypeCheck +left.dataType + } + + @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = { +if (elementTypeSupportEquals) { + (array1, array2) => +val hs = new OpenHashSet[Any] --- End diff -- I would be strongly in favor of just using `MemoryBlock` for binary types, or something similar. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21897: [minor] Improve documentation for HiveStringType's
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/21897 @rxin no, they should not have been public. IMO we should just hide them for 3.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21897: [minor] Improve documentation for HiveStringType's
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/21897 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/21821 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21840: [WIP] New copy() method for Column of StructType
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21840#discussion_r204476440 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3858,3 +3858,29 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +case class StructCopy( +struct: Expression, +fieldName: String, +fieldValue: Expression) extends Expression with CodegenFallback { + + override def children: Seq[Expression] = Seq(struct, fieldValue) + override def nullable: Boolean = struct.nullable + + lazy val fieldIndex = struct.dataType.asInstanceOf[StructType].fieldIndex(fieldName) + + override def dataType: DataType = { +val structType = struct.dataType.asInstanceOf[StructType] +val field = structType.fields(fieldIndex).copy(dataType = fieldValue.dataType) + +structType.copy(fields = structType.fields.updated(fieldIndex, field)) + } + + override def eval(input: InternalRow): Any = { +val newFieldValue = fieldValue.eval(input) +val structValue = struct.eval(input).asInstanceOf[GenericInternalRow] --- End diff -- You cannot assume this and you also cannot update the row in-place. You will need to copy the row I am affraid. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/21821 @gatorsmile do we still need this patch if maryann fixes this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org