[GitHub] spark pull request #22775: [SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22775#discussion_r227237973 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -770,8 +776,17 @@ case class SchemaOfJson( factory } - override def convert(v: UTF8String): UTF8String = { -val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, v)) { parser => + @transient + private lazy val json = child.eval().asInstanceOf[UTF8String] --- End diff -- is it possible that users want to run `schema_of_json` on a string column of table? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22797: [SPARK-19851][SQL] Add support for EVERY and ANY (SOME) ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22797 Hi @dilipbiswal sorry for the back and forth, can you try one more approach? Basically we want to evaluate how the simplest logical rewrite looks like. We can create unevaluatable EVERY and ANY expressions, and create a rule to replace them with MAX and MIN. This rule can be put in the beginning of optimizer like `RewriteDistinctAggregates`. We don't need to create a completed framework to rewrite agg functions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22800: [SPARK-24499][SQL][DOC][follow-up] Fix spelling in doc
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22800 thanks, merging to master/2.4! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22800: [SPARK-24499][SQL][DOC][follow-up] Fix spelling in doc
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22800 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22790: [SPARK-25793][ML]call SaveLoadV2_0.load for classNameV2_...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22790 cc @mengxr @WeichenXu123 how serious is it? shall we treat it as a blocker? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22799: [SPARK-25805][SQL][TEST] Fix test for SPARK-25159
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22799 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22788: [SPARK-25769][SQL]escape nested columns by backti...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22788#discussion_r227203199 --- Diff: sql/core/src/test/resources/sql-tests/results/columnresolution-negative.sql.out --- @@ -81,7 +81,7 @@ SELECT t1.i1 FROM t1, mydb1.t1 struct<> -- !query 9 output org.apache.spark.sql.AnalysisException -Reference 't1.i1' is ambiguous, could be: mydb1.t1.i1, mydb1.t1.i1.; line 1 pos 7 +Reference '`t1`.`i1`' is ambiguous, could be: mydb1.t1.i1, mydb1.t1.i1.; line 1 pos 7 --- End diff -- These examples only make sense when we have the outer backticks. e.g. `'t1.i1'` is good. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22788: [SPARK-25769][SQL]escape nested columns by backti...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22788#discussion_r227202767 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala --- @@ -99,7 +99,7 @@ case class UnresolvedTableValuedFunction( case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute with Unevaluable { def name: String = -nameParts.map(n => if (n.contains(".")) s"`$n`" else n).mkString(".") +nameParts.map(n => if (nameParts.length > 1 || n.contains(".")) s"`$n`" else n).mkString(".") --- End diff -- I don't think this is better for `name`, we should update `sql` though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22512: [SPARK-25498][SQL] InterpretedMutableProjection should h...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22512 LGTM, we also need a unit test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227202171 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala --- @@ -140,6 +141,14 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { val input = fileToString(new File(testCase.inputFile)) val (comments, code) = input.split("\n").partition(_.startsWith("--")) + +// Runs all the tests on both codegen-only and interpreter modes. Since explain results differ +// when `WHOLESTAGE_CODEGEN_ENABLED` disabled, we don't run these tests now. +val codegenConfigSets = Array(("false", "NO_CODEGEN"), ("true", "CODEGEN_ONLY")).map { + case (wholeStageCodegenEnabled, codegenFactoryMode) => +Array( // SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholeStageCodegenEnabled, --- End diff -- If `wholeStageCodegenEnabled` is not used, let's not complicate the code now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227200656 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -49,10 +51,54 @@ class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mutable def currentValue: InternalRow = mutableRow override def target(row: InternalRow): MutableProjection = { +// If `mutableRow` is `UnsafeRow`, `MutableProjection` accepts fixed-length types only +assert(!row.isInstanceOf[UnsafeRow] || + validExprs.forall { case (e, _) => UnsafeRow.isFixedLength(e.dataType) }) mutableRow = row this } + private[this] val fieldWriters = validExprs.map { case (e, i) => +val writer = generateRowWriter(i, e.dataType) +if (!e.nullable) { + (v: Any) => writer(v) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(v) +} + } +} + } + + private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = dt match { --- End diff -- we have `InternalRow.getAccessor`, shall we move this method there too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227200458 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -49,10 +51,54 @@ class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mutable def currentValue: InternalRow = mutableRow override def target(row: InternalRow): MutableProjection = { +// If `mutableRow` is `UnsafeRow`, `MutableProjection` accepts fixed-length types only +assert(!row.isInstanceOf[UnsafeRow] || + validExprs.forall { case (e, _) => UnsafeRow.isFixedLength(e.dataType) }) mutableRow = row this } + private[this] val fieldWriters = validExprs.map { case (e, i) => +val writer = generateRowWriter(i, e.dataType) +if (!e.nullable) { + (v: Any) => writer(v) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(v) +} + } +} + } + + private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = dt match { +case BooleanType => + v => mutableRow.setBoolean(ordinal, v.asInstanceOf[Boolean]) +case ByteType => + v => mutableRow.setByte(ordinal, v.asInstanceOf[Byte]) +case ShortType => + v => mutableRow.setShort(ordinal, v.asInstanceOf[Short]) +case IntegerType | DateType => + v => mutableRow.setInt(ordinal, v.asInstanceOf[Int]) +case LongType | TimestampType => + v => mutableRow.setLong(ordinal, v.asInstanceOf[Long]) +case FloatType => + v => mutableRow.setFloat(ordinal, v.asInstanceOf[Float]) +case DoubleType => + v => mutableRow.setDouble(ordinal, v.asInstanceOf[Double]) +case DecimalType.Fixed(precision, _) => + v => mutableRow.setDecimal(ordinal, v.asInstanceOf[Decimal], precision) +case CalendarIntervalType | BinaryType | _: ArrayType | StringType | _: StructType | + _: MapType | _: UserDefinedType[_] => + v => mutableRow.update(ordinal, v) +case NullType => + v => {} --- End diff -- shall we call `mutableRow.setNullAt`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22745#discussion_r227037566 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala --- @@ -278,24 +278,20 @@ object JavaTypeInference { case _ if mapType.isAssignableFrom(typeToken) => val (keyType, valueType) = mapKeyValueType(typeToken) -val keyDataType = inferDataType(keyType)._1 -val valueDataType = inferDataType(valueType)._1 val keyData = Invoke( -MapObjects( +UnresolvedMapObjects( p => deserializerFor(keyType, Some(p)), - Invoke(getPath, "keyArray", ArrayType(keyDataType)), - keyDataType), + UnresolvedGetArrayFromMap(getPath, GetArrayFromMap.Key())), --- End diff -- Yea we can write eval and doGenCode from scratch. It's also more efficient since we can omit the useless try-catch in `Invoke`. e.g. ``` // from UnaryExpression override def nullSafeEval(input: Any) = input.asInstanceOf[MapData].keys override def doGenCode = defineCodeGen(ctx, ev, c => s"$c.keys") ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19788 BTW, let's add a config for this feature. We may enable adaptive execution by default in the future, and we should still allow users to run spark with legacy shuffle service. We should also throw a meaningful exception to ask users to turn off this config or upgrade shuffle service if incompatibility happens. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22786: [SPARK-25764][ML][EXAMPLES] Update BisectingKMeans examp...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22786 also cc @WeichenXu123 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20820: [SPARK-23676][SQL]Support left join codegen in SortMerge...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20820 This is good to have, but we should follow `BroadcastHashJoinExec` and make the implementation more structured. e.g. `codegenInner`, `codegenOuter`, etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22788: [SPARK-25769][SQL]change nested columns from `a.b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22788#discussion_r226988117 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2702,7 +2702,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { val e = intercept[AnalysisException](sql("SELECT v.i from (SELECT i FROM v)")) assert(e.message == -"cannot resolve '`v.i`' given input columns: [__auto_generated_subquery_name.i]") --- End diff -- I think the problem here is the out-most backticks, do you know where we add it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22788: [SPARK-25769][SQL]change nested columns from `a.b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22788#discussion_r226987977 --- Diff: sql/core/src/test/resources/sql-tests/results/columnresolution-negative.sql.out --- @@ -81,7 +81,7 @@ SELECT t1.i1 FROM t1, mydb1.t1 struct<> -- !query 9 output org.apache.spark.sql.AnalysisException -Reference 't1.i1' is ambiguous, could be: mydb1.t1.i1, mydb1.t1.i1.; line 1 pos 7 +Reference '`t1`.`i1`' is ambiguous, could be: mydb1.t1.i1, mydb1.t1.i1.; line 1 pos 7 --- End diff -- why is the new format better? it's more verbose, isn't it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17520: [WIP][SPARK-19712][SQL] Move PullupCorrelatedPredicates ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/17520 is it time to revisit it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22047: [SPARK-19851] Add support for EVERY and ANY (SOME...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22047#discussion_r226985475 --- Diff: python/pyspark/sql/functions.py --- @@ -403,6 +403,28 @@ def countDistinct(col, *cols): return Column(jc) +def every(col): --- End diff -- +1 for option 1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21860: [SPARK-24901][SQL]Merge the codegen of RegularHashMap an...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21860 LGTM except the naming --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21860: [SPARK-24901][SQL]Merge the codegen of RegularHas...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21860#discussion_r226981919 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -744,6 +744,7 @@ case class HashAggregateExec( val unsafeRowKeys = unsafeRowKeyCode.value val unsafeRowBuffer = ctx.freshName("unsafeRowAggBuffer") val fastRowBuffer = ctx.freshName("fastAggBuffer") +val updateAggBuffer = ctx.freshName("updateAggBuffer") --- End diff -- `updatedAggBuffer`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22745#discussion_r226981527 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala --- @@ -278,24 +278,20 @@ object JavaTypeInference { case _ if mapType.isAssignableFrom(typeToken) => val (keyType, valueType) = mapKeyValueType(typeToken) -val keyDataType = inferDataType(keyType)._1 -val valueDataType = inferDataType(valueType)._1 val keyData = Invoke( -MapObjects( +UnresolvedMapObjects( p => deserializerFor(keyType, Some(p)), - Invoke(getPath, "keyArray", ArrayType(keyDataType)), - keyDataType), + UnresolvedGetArrayFromMap(getPath, GetArrayFromMap.Key())), --- End diff -- Seems we don't need to make it unresolved ``` case class GetArrayFromMap(map: Expression, getKey: Boolean) extends Expression = { override def inputTypes = Seq(MapType) override def dataType = { val MapType(kt, vt) = map.dataType.asInstanceOf[MapType] if (getKey) kv else vt } override def eval... override def doCodegen... } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22785: [SPARK-25791][SQL] Datatype of serializers in Row...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22785#discussion_r226978705 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala --- @@ -187,7 +187,7 @@ object RowEncoder { val convertedField = if (field.nullable) { If( Invoke(inputObject, "isNullAt", BooleanType, Literal(index) :: Nil), -Literal.create(null, field.dataType), +Literal.create(null, fieldValue.dataType), --- End diff -- can we add a comment to explain it? `field.dataType` can be different from `fieldValue.dataType`, because we strip UDT --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22785: [SPARK-25791][SQL] Datatype of serializers in Row...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22785#discussion_r226978368 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala --- @@ -171,7 +171,7 @@ object RowEncoder { if (inputObject.nullable) { If(IsNull(inputObject), - Literal.create(null, inputType), + Literal.create(null, nonNullOutput.dataType), --- End diff -- I see, for this case it has no difference, but makes the code clearer --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22785: [SPARK-25791][SQL] Datatype of serializers in Row...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22785#discussion_r226978811 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala --- @@ -273,6 +273,16 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { assert(e4.getMessage.contains("java.lang.String is not a valid external type")) } + test("SPARK-25791: Datatype of serializers should be accessible") { +val udtSQLType = new StructType().add("a", IntegerType) +val pythonUDT = new PythonUserDefinedType(udtSQLType, "pyUDT", "serializedPyClass") --- End diff -- can we reproduce the bug with normal UDT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22785: [SPARK-25791][SQL] Datatype of serializers in Row...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22785#discussion_r226977895 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala --- @@ -171,7 +171,7 @@ object RowEncoder { if (inputObject.nullable) { If(IsNull(inputObject), - Literal.create(null, inputType), + Literal.create(null, nonNullOutput.dataType), --- End diff -- what's the difference? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22785: [SPARK-25791][SQL] Datatype of serializers in Row...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22785#discussion_r226977568 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala --- @@ -273,6 +273,16 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { assert(e4.getMessage.contains("java.lang.String is not a valid external type")) } + test("SPARK-25791: Datatype of serializers should be accessible") { +val udtSQLType = new StructType().add("a", IntegerType) +val pythonUDT = new PythonUserDefinedType(udtSQLType, "pyUDT", "serializedPyClass") +val schema = new StructType().add("pythonUDT", pythonUDT, true) +val encoder = RowEncoder(schema) +// scalastyle:off println +encoder.serializer.foreach(s => println(s.dataType)) --- End diff -- we shouldn't print in a test, can we use assert? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21402: SPARK-24355 Spark external shuffle server improvement to...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21402 shall we close it since #22173 is merged? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19788 Hi @yucai , good points on the performance concerns. Let's go with the previous approach: https://github.com/apache/spark/pull/19788#issuecomment-366887404 sorry for the back and forth! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22575 do we have a full story about stream sql? is the `STREAM` keyword the only difference between stream sql and normal sql? also cc @tdas @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22575 do we have a full story about stream sql? is the `STREAM` keyword the only difference between stream sql and normal sql? how could users define watermark with SQL? also cc @tdas @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22575 do we have a full story about stream sql? is the `STREAM` keyword the only difference between stream sql and normal sql? how could users define watermark with SQL? also cc @tdas @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22575 do we have a full story about stream sql? is the `STREAM` keyword the only difference between stream sql and normal sql? how could users define watermark with SQL? also cc @tdas @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22575 do we have a full story about stream sql? is the `STREAM` keyword the only difference between stream sql and normal sql? how could users define watermark with SQL? also cc @tdas @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22575 Do we have a full story about streaming SQL? is the `STREAM` keyword the only difference between stream sql and normal sql? also cc @tdas @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22756: [SPARK-25758][ML] Deprecate computeCost on BisectingKMea...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22756 reverted from master. Let's move the discussion to https://github.com/apache/spark/pull/22764 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22763: [SPARK-25764][ML][EXAMPLES] Update BisectingKMeans examp...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22763 ah i see. @mgaido91 can you resubmit it and update the description? The method is not deprecated now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22501: [SPARK-25492][TEST] Refactor WideSchemaBenchmark to use ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22501 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22547 Let's move the high-level discussion to https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22750: [SPARK-25747][SQL] remove ColumnarBatchScan.needs...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22750#discussion_r226819049 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -168,10 +168,11 @@ case class FileSourceScanExec( // Note that some vals referring the file-based relation are lazy intentionally // so that this plan can be canonicalized on executor side too. See SPARK-23731. - override lazy val supportsBatch: Boolean = relation.fileFormat.supportBatch( -relation.sparkSession, StructType.fromAttributes(output)) + override lazy val supportsBatch: Boolean = { +relation.fileFormat.supportBatch(relation.sparkSession, schema) + } - override lazy val needsUnsafeRowConversion: Boolean = { + private lazy val needsUnsafeRowConversion: Boolean = { if (relation.fileFormat.isInstanceOf[ParquetSource]) { --- End diff -- Our Parquet reader has one more feature than ORC: if vectorized reader is on but whole-stage-codegen is off, we can still read parquet with batches, and return `ColumnarRow`s. In ORC, if whole-stage-codegen is off, we also turn off vectorized reader, so ORC never return `ColumnarRow`. However, I just found that this parquet reader feature is gone. Let's send a new PR to fix it or completely remove it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22756: [SPARK-25758][ML] Deprecate computeCost on BisectingKMea...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22756 shall we revert it from master as well? At least we need to update the message `This method is deprecated and will be removed in 3.0.0.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22781: [MINOR][DOC] Fix the building document to describ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22781#discussion_r226816630 --- Diff: docs/building-spark.md --- @@ -12,7 +12,7 @@ redirect_from: "building-with-maven.html" ## Apache Maven The Maven-based build is the build of reference for Apache Spark. -Building Spark using Maven requires Maven 3.3.9 or newer and Java 8+. +Building Spark using Maven requires Maven 3.3.9 or newer and Java 8. --- End diff -- let's change to the expected maven version, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22501: [SPARK-25492][TEST] Refactor WideSchemaBenchmark to use ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22501 seems jenkins is broken, cc @shaneknapp ``` Command "/tmp/tmp.JfFHaoRFPU/3.5/bin/python -c "import setuptools, tokenize;__file__='/home/jenkins/workspace/SparkPullRequestBuilder/python/setup.py';f=getattr(tokenize, 'open', open)(__file__);code=f.read().replace('\r\n', '\n');f.close();exec(compile(code, __file__, 'exec'))" develop --no-deps" failed with error code 1 in /home/jenkins/workspace/SparkPullRequestBuilder/python/ You are using pip version 10.0.1, however version 18.1 is available. You should consider upgrading via the 'pip install --upgrade pip' command. Cleaning up temporary directory - /tmp/tmp.JfFHaoRFPU [error] running /home/jenkins/workspace/SparkPullRequestBuilder/dev/run-pip-tests ; received return code 1 ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22750: [SPARK-25747][SQL] remove ColumnarBatchScan.needsUnsafeR...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22750 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226812577 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Format.java --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; + +/** + * The base interface for data source v2. Implementations must have a public, 0-arg constructor. + * + * The major responsibility of this interface is to return a {@link Table} for read/write. + */ +@InterfaceStability.Evolving +public interface Format extends DataSourceV2 { --- End diff -- the write API has not been migrated and still need `DataSourceV2` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22750: [SPARK-25747][SQL] remove ColumnarBatchScan.needs...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22750#discussion_r226812447 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala --- @@ -164,12 +162,11 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { val outputVars = output.zipWithIndex.map { case (a, i) => --- End diff -- good catch! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22501: [SPARK-25492][TEST] Refactor WideSchemaBenchmark to use ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22501 thank you guys for refreshing the benchmarks and results! It's very helpful. If possible, can we post the perf regressions we found in the umbrella JIRA? Then people can see if the perf regression is reasonable(if we have addressed it) or investigate how the regression was introduced. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22763: [SPARK-25764][ML][EXAMPLES] Update BisectingKMeans examp...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22763 This has been reverted from master/2.4 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22764: [SPARK-25765][ML] Add training cost to BisectingKMeans s...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22764 Since this PR is a little more complicated than we expect, we decided to not have it in 2.4.0. I'm not sure if we can treat it as a special case and put it in 2.4.1, cc @mengxr Anyway, the other 2 related PRs(deprecating the API and updating the example) are reverted. We need to think about what we should do if we can only do this in 3.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22466#discussion_r226655438 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala --- @@ -207,6 +207,14 @@ class SessionCatalog( "you cannot create a database with this name.") } validateName(dbName) +// SPARK-25464 fail if DB location exists and is not empty +val dbPath = new Path(dbDefinition.locationUri) +val fs = dbPath.getFileSystem(hadoopConf) +if (!externalCatalog.databaseExists(dbName) && fs.exists(dbPath) + && fs.listStatus(dbPath).nonEmpty) { --- End diff -- if the behavior needs to be consistent over all the external catalog implementations(it should be), then I think putting it here is reasonable. But listing files might be too expensive(e.g. several hourse), is there a better way to check if a directory is empty? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22764: [SPARK-25765][ML] Add training cost to BisectingK...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22764#discussion_r226652377 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala --- @@ -225,13 +227,14 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { assert(formatVersion == thisFormatVersion) val rootId = (metadata \ "rootId").extract[Int] val distanceMeasure = (metadata \ "distanceMeasure").extract[String] + val trainingCost = (metadata \ "trainingCost").extract[Double] --- End diff -- Do other models have this problem? I was told that this change just follows what we did for other models before. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22766: [SPARK-25768][SQL] fix constant argument expecting UDAFs
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22766 thanks, merging to master/2.4/2.3! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22743: [SPARK-25740][SQL] Refactor DetermineTableStats to inval...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22743 > Datasource table will not cache in tableRelationCache. I don't think so. Spark caches data source table in `FindDataSourceTable` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22666: [SPARK-25672][SQL] schema_of_csv() - schema infer...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22666#discussion_r226641023 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -3886,6 +3886,31 @@ object functions { withExpr(new CsvToStructs(e.expr, schema.expr, options.asScala.toMap)) } + /** + * Parses a column containing a CSV string and infers its schema. + * + * @param e a string column containing CSV data. + * + * @group collection_funcs + * @since 3.0.0 + */ + def schema_of_csv(e: Column): Column = withExpr(new SchemaOfCsv(e.expr)) + + /** + * Parses a column containing a CSV string and infers its schema using options. + * + * @param e a string column containing CSV data. + * @param options options to control how the CSV is parsed. accepts the same options and the + *json data source. See [[DataFrameReader#csv]]. + * @return a column with string literal containing schema in DDL format. + * + * @group collection_funcs + * @since 3.0.0 + */ + def schema_of_csv(e: Column, options: java.util.Map[String, String]): Column = { --- End diff -- shall we have an API with scala Map? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22666: [SPARK-25672][SQL] schema_of_csv() - schema infer...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22666#discussion_r226640860 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala --- @@ -155,4 +155,15 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P }.getCause assert(exception.getMessage.contains("from_csv() doesn't support the DROPMALFORMED mode")) } + + test("infer schema of CSV strings") { +checkEvaluation(new SchemaOfCsv(Literal.create("1,abc")), "struct<_c0:int,_c1:string>") + } + + test("infer schema of CSV strings by using options") { +checkEvaluation( + new SchemaOfCsv(Literal.create("1|abc"), +CreateMap(Seq(Literal.create("delimiter"), Literal.create("|", --- End diff -- the main constructor of `SchemaOfCsv` accepts `Map[String, String]` directly, shall we use that? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22666: [SPARK-25672][SQL] schema_of_csv() - schema infer...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22666#discussion_r226640362 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -60,7 +63,7 @@ case class CsvToStructs( // Used in `FunctionRegistry` def this(child: Expression, schema: Expression, options: Map[String, String]) = this( - schema = ExprUtils.evalSchemaExpr(schema), + schema = ExprUtils.evalSchemaExpr(schema).asInstanceOf[StructType], --- End diff -- why do we need `asInstanceOf`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF constructor sig...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22732 thanks, merging to master/2.4! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22750: [SPARK-25747][SQL] remove ColumnarBatchScan.needsUnsafeR...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22750 `DataSourceScanExec` does not have `needsUnsafeRowConversion` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22743: [SPARK-25740][SQL] Refactor DetermineTableStats to inval...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22743 why it's only a problem for hive tables? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r226520350 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -932,6 +935,23 @@ trait ScalaReflection { tpe.dealias.erasure.typeSymbol.asClass.fullName } + /** + * Returns the nullability of the input parameter types of the scala function object. + * + * Note that this only works with Scala 2.11, and the information returned may be inaccurate if + * used with a different Scala version. --- End diff -- shall we explicitly return seq of true if it's not scala 2.11? Then the behavior is more predictable than `may be inaccurate`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r226519284 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -43,10 +44,11 @@ import org.apache.spark.util.Utils *to the name `value`. */ object ExpressionEncoder { + def apply[T : TypeTag](): ExpressionEncoder[T] = { -// We convert the not-serializable TypeTag into StructType and ClassTag. val mirror = ScalaReflection.mirror -val tpe = typeTag[T].in(mirror).tpe +val tpe = ScalaReflection.localTypeOf[T] --- End diff -- `localTypeOf` has a `dealias` at the end. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22743: [SPARK-25740][SQL] Refactor DetermineTableStats to inval...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22743 can you explain more about how this happens? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r226517584 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala --- @@ -39,29 +42,29 @@ import org.apache.spark.sql.types.DataType * @param nullable True if the UDF can return null value. * @param udfDeterministic True if the UDF is deterministic. Deterministic UDF returns same result * each time it is invoked with a particular input. - * @param nullableTypes which of the inputTypes are nullable (i.e. not primitive) */ case class ScalaUDF( function: AnyRef, dataType: DataType, children: Seq[Expression], +inputsNullSafe: Seq[Boolean], inputTypes: Seq[DataType] = Nil, udfName: Option[String] = None, nullable: Boolean = true, -udfDeterministic: Boolean = true, -nullableTypes: Seq[Boolean] = Nil) +udfDeterministic: Boolean = true) extends Expression with ImplicitCastInputTypes with NonSQLExpression with UserDefinedExpression { // The constructor for SPARK 2.1 and 2.2 def this( function: AnyRef, dataType: DataType, children: Seq[Expression], + inputsNullSafe: Seq[Boolean], --- End diff -- SGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22501: [SPARK-25492][TEST] Refactor WideSchemaBenchmark ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22501#discussion_r226516354 --- Diff: sql/core/benchmarks/WideSchemaBenchmark-results.txt --- @@ -1,117 +1,145 @@ -Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 -Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz + +parsing large select expressions + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6 +Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz parsing large select:Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative -1 select expressions 2 /4 0.0 2050147.0 1.0X -100 select expressions 6 /7 0.0 6123412.0 0.3X -2500 select expressions135 / 141 0.0 134623148.0 0.0X +1 select expressions 2 /4 0.0 1934953.0 1.0X +100 select expressions 4 /5 0.0 3659399.0 0.5X +2500 select expressions 68 / 76 0.0 68278937.0 0.0X -Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 -Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz + +many column field read and write + + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6 +Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz many column field r/w: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative -1 cols x 10 rows (read in-mem) 16 / 18 6.3 158.6 1.0X -1 cols x 10 rows (exec in-mem) 17 / 19 6.0 166.7 1.0X -1 cols x 10 rows (read parquet) 24 / 26 4.3 235.1 0.7X -1 cols x 10 rows (write parquet)81 / 85 1.2 811.3 0.2X -100 cols x 1000 rows (read in-mem) 17 / 19 6.0 166.2 1.0X -100 cols x 1000 rows (exec in-mem) 25 / 27 4.0 249.2 0.6X -100 cols x 1000 rows (read parquet) 23 / 25 4.4 226.0 0.7X -100 cols x 1000 rows (write parquet)83 / 87 1.2 831.0 0.2X -2500 cols x 40 rows (read in-mem) 132 / 137 0.8 1322.9 0.1X -2500 cols x 40 rows (exec in-mem) 326 / 330 0.3 3260.6 0.0X -2500 cols x 40 rows (read parquet) 831 / 839 0.1 8305.8 0.0X -2500 cols x 40 rows (write parquet)237 / 245 0.4 2372.6 0.1X - -Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 -Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz +1 cols x 10 rows (read in-mem) 22 / 25 4.6 219.4 1.0X +1 cols x 10 rows (exec in-mem) 22 / 28 4.5 223.8 1.0X +1 cols x 10 rows (read parquet) 45 / 49 2.2 449.6 0.5X +1 cols x 10 rows (write parquet) 204 / 223 0.5 2044.4 0.1X --- End diff -- I have no idea how this happens. Can you create a JIRA ticket to investigate this regression? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22763: [SPARK-25764][ML][EXAMPLES] Update BisectingKMeans examp...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22763 thanks, merging to master/2.4! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22764: [SPARK-25765][ML] Add training cost to BisectingK...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22764#discussion_r226512051 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala --- @@ -310,4 +317,6 @@ class BisectingKMeansSummary private[clustering] ( predictionCol: String, featuresCol: String, k: Int, -numIter: Int) extends ClusteringSummary(predictions, predictionCol, featuresCol, k, numIter) +numIter: Int, +@Since("2.4.0") val trainingCost: Double) --- End diff -- SGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22750: [SPARK-25747][SQL] remove ColumnarBatchScan.needsUnsafeR...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22750 which description is inaccurate? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22766: [SPARK-25768][SQL] fix constant argument expectin...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22766#discussion_r226511589 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala --- @@ -340,39 +340,39 @@ private[hive] case class HiveUDAFFunction( resolver.getEvaluator(parameterInfo) } - // The UDAF evaluator used to consume raw input rows and produce partial aggregation results. - @transient - private lazy val partial1ModeEvaluator = newEvaluator() + private case class PartialEvaluator( +evaluator: GenericUDAFEvaluator, +objectInspector: ObjectInspector) + // The UDAF evaluator used to consume raw input rows and produce partial aggregation results. // Hive `ObjectInspector` used to inspect partial aggregation results. @transient - private val partialResultInspector = partial1ModeEvaluator.init( -GenericUDAFEvaluator.Mode.PARTIAL1, -inputInspectors - ) + private lazy val partial1Mode = { +val evaluator = newEvaluator() +PartialEvaluator(evaluator, evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors)) + } // The UDAF evaluator used to merge partial aggregation results. @transient private lazy val partial2ModeEvaluator = { val evaluator = newEvaluator() -evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, Array(partialResultInspector)) +evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, Array(partial1Mode.objectInspector)) evaluator } // Spark SQL data type of partial aggregation results @transient - private lazy val partialResultDataType = inspectorToDataType(partialResultInspector) + private lazy val partialResultDataType = inspectorToDataType(partial1Mode.objectInspector) // The UDAF evaluator used to compute the final result from a partial aggregation result objects. - @transient - private lazy val finalModeEvaluator = newEvaluator() - // Hive `ObjectInspector` used to inspect the final aggregation result object. @transient - private val returnInspector = finalModeEvaluator.init( -GenericUDAFEvaluator.Mode.FINAL, -Array(partialResultInspector) - ) + private lazy val finalMode = { --- End diff -- ah it's also used in final mode, then maybe `HiveEvaluator` is a better name than `PartialEvaluator` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22766: [SPARK-25768][SQL] fix constant argument expectin...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22766#discussion_r226511506 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala --- @@ -340,39 +340,39 @@ private[hive] case class HiveUDAFFunction( resolver.getEvaluator(parameterInfo) } - // The UDAF evaluator used to consume raw input rows and produce partial aggregation results. - @transient - private lazy val partial1ModeEvaluator = newEvaluator() + private case class PartialEvaluator( +evaluator: GenericUDAFEvaluator, +objectInspector: ObjectInspector) + // The UDAF evaluator used to consume raw input rows and produce partial aggregation results. // Hive `ObjectInspector` used to inspect partial aggregation results. @transient - private val partialResultInspector = partial1ModeEvaluator.init( -GenericUDAFEvaluator.Mode.PARTIAL1, -inputInspectors - ) + private lazy val partial1Mode = { +val evaluator = newEvaluator() +PartialEvaluator(evaluator, evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors)) + } // The UDAF evaluator used to merge partial aggregation results. @transient private lazy val partial2ModeEvaluator = { val evaluator = newEvaluator() -evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, Array(partialResultInspector)) +evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, Array(partial1Mode.objectInspector)) evaluator } // Spark SQL data type of partial aggregation results @transient - private lazy val partialResultDataType = inspectorToDataType(partialResultInspector) + private lazy val partialResultDataType = inspectorToDataType(partial1Mode.objectInspector) // The UDAF evaluator used to compute the final result from a partial aggregation result objects. - @transient - private lazy val finalModeEvaluator = newEvaluator() - // Hive `ObjectInspector` used to inspect the final aggregation result object. @transient - private val returnInspector = finalModeEvaluator.init( -GenericUDAFEvaluator.Mode.FINAL, -Array(partialResultInspector) - ) + private lazy val finalMode = { --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22766: [SPARK-25768][SQL] fix constant argument expectin...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22766#discussion_r226511479 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala --- @@ -340,39 +340,39 @@ private[hive] case class HiveUDAFFunction( resolver.getEvaluator(parameterInfo) } - // The UDAF evaluator used to consume raw input rows and produce partial aggregation results. - @transient - private lazy val partial1ModeEvaluator = newEvaluator() + private case class PartialEvaluator( +evaluator: GenericUDAFEvaluator, +objectInspector: ObjectInspector) + // The UDAF evaluator used to consume raw input rows and produce partial aggregation results. // Hive `ObjectInspector` used to inspect partial aggregation results. @transient - private val partialResultInspector = partial1ModeEvaluator.init( -GenericUDAFEvaluator.Mode.PARTIAL1, -inputInspectors - ) + private lazy val partial1Mode = { --- End diff -- `partial1ModeEvaluator` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22764: [SPARK-25765][ML] Add training cost to BisectingK...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22764#discussion_r226384584 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala --- @@ -310,4 +317,6 @@ class BisectingKMeansSummary private[clustering] ( predictionCol: String, featuresCol: String, k: Int, -numIter: Int) extends ClusteringSummary(predictions, predictionCol, featuresCol, k, numIter) +numIter: Int, +@Since("2.4.0") val trainingCost: Double) --- End diff -- oh wait. If the final goal is to have a consistent ML API in 3.0, do we have to put this new API in 2.4? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22766: [SPARK-25768][SQL] fix constant argument expectin...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22766#discussion_r226375347 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala --- @@ -339,40 +339,38 @@ private[hive] case class HiveUDAFFunction( val parameterInfo = new SimpleGenericUDAFParameterInfo(inputInspectors, false, false) resolver.getEvaluator(parameterInfo) } + + private case class Mode(evaluator: GenericUDAFEvaluator, objectInspector: ObjectInspector) --- End diff -- maybe just `PartialEvaluator`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22766: [SPARK-25768][SQL] fix constant argument expecting UDAFs
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22766 OK to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22764: [SPARK-25765][ML] Add training cost to BisectingK...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22764#discussion_r226372701 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala --- @@ -310,4 +317,6 @@ class BisectingKMeansSummary private[clustering] ( predictionCol: String, featuresCol: String, k: Int, -numIter: Int) extends ClusteringSummary(predictions, predictionCol, featuresCol, k, numIter) +numIter: Int, +@Since("2.4.0") val trainingCost: Double) --- End diff -- this PR targets to 2.4, see more context at https://github.com/apache/spark/pull/22756 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22547 A major part of this PR is to update existing streaming sources, which is just moving code around. There are 3 things we need to pay attention to during review: 1. the naming and documentation of the new interfaces. 2. the new streaming query planning workflow. See the PR description for details. 3. the updated tests, make sure there is nothing wrong. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226363445 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala --- @@ -381,7 +390,7 @@ class StreamSuite extends StreamTest { test("insert an extraStrategy") { try { - spark.experimental.extraStrategies = TestStrategy :: Nil + spark.experimental.extraStrategies = CustomStrategy :: Nil --- End diff -- Since we need to do a temporary planning for streaming queries, we can't allow custom strategy to remove streaming leaf nodes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226363020 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala --- @@ -154,21 +159,25 @@ class StreamSuite extends StreamTest { } test("SPARK-20432: union one stream with itself") { -val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a") -val unioned = df.union(df) -withTempDir { outputDir => - withTempDir { checkpointDir => -val query = - unioned -.writeStream.format("parquet") -.option("checkpointLocation", checkpointDir.getAbsolutePath) -.start(outputDir.getAbsolutePath) -try { - query.processAllAvailable() - val outputDf = spark.read.parquet(outputDir.getAbsolutePath).as[Long] - checkDatasetUnorderly[Long](outputDf, (0L to 10L).union((0L to 10L)).toArray: _*) -} finally { - query.stop() +val v1Source = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a") +val v2Source = spark.readStream.format(classOf[FakeFormat].getName).load().select("a") + +Seq(v1Source, v2Source).foreach { df => --- End diff -- improve this test to make sure v2 also works. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226361309 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala --- @@ -319,29 +307,18 @@ class RateSourceSuite extends StreamTest { "rate source does not support user-specified schema")) } - test("continuous in registry") { --- End diff -- we don't need this test now. With the new `Format` abstraction, the lookup logic is unified between microbatch and continuous --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226359031 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchInputStream.scala --- @@ -60,6 +59,14 @@ class RateStreamMicroBatchReadSupport(options: DataSourceOptions, checkpointLoca s" is $maxSeconds, but 'rampUpTimeSeconds' is $rampUpTimeSeconds.") } + private val numPartitions = { --- End diff -- moved from https://github.com/apache/spark/pull/22547/files#diff-6cd4de793a1c68d3d9415a246823b55eL151 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226355931 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -90,6 +140,8 @@ class ContinuousExecution( do { runContinuous(sparkSessionForStream) } while (state.updateAndGet(stateUpdate) == ACTIVE) + +stopSources() --- End diff -- with the new abstraction, we should only stop sources when the stream query ends, instead of each reconfiguration. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226338580 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.reader.BatchScan; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * A mix-in interface for {@link Table}. Table implementations can mixin this interface to + * provide data reading ability for batch processing. + */ +@InterfaceStability.Evolving +public interface SupportsBatchRead extends Table { + + /** + * Creates a {@link BatchScan} instance with a {@link ScanConfig} and user-specified options. + * + * @param config a {@link ScanConfig} which may contains operator pushdown information. + * @param options the user-specified options, which is same as the one used to create the + *{@link ScanConfigBuilder} that built the given {@link ScanConfig}. --- End diff -- Another choice is to let `ScanConfig` carry the options. But `ScanConfig` is an interface and doing this will put more work at user side, so I decided to pass the options again here. Feedbacks are welcome! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22764: [SPARK-25765][ML] Add training cost to BisectingKMeans s...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22764 does the example need to be updated with this new API? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEncoder to...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22749 I like this idea! waiting for tests pass --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r226301402 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -212,21 +183,88 @@ object ExpressionEncoder { * A generic encoder for JVM objects that uses Catalyst Expressions for a `serializer` * and a `deserializer`. * - * @param schema The schema after converting `T` to a Spark SQL row. - * @param serializer A set of expressions, one for each top-level field that can be used to - * extract the values from a raw object into an [[InternalRow]]. - * @param deserializer An expression that will construct an object given an [[InternalRow]]. + * @param objSerializer An expression that can be used to encode a raw object to corresponding + * Spark SQL representation that can be a primitive column, array, map or a + * struct. This represents how Spark SQL generally serializes an object of + * type `T`. + * @param objDeserializer An expression that will construct an object given a Spark SQL + *representation. This represents how Spark SQL generally deserializes + *a serialized value in Spark SQL representation back to an object of + *type `T`. * @param clsTag A classtag for `T`. */ case class ExpressionEncoder[T]( -schema: StructType, -flat: Boolean, -serializer: Seq[Expression], -deserializer: Expression, +objSerializer: Expression, +objDeserializer: Expression, clsTag: ClassTag[T]) extends Encoder[T] { - if (flat) require(serializer.size == 1) + /** + * A set of expressions, one for each top-level field that can be used to + * extract the values from a raw object into an [[InternalRow]]: + * 1. If `serializer` encodes a raw object to a struct, we directly use the `serializer`. + * 2. For other cases, we create a struct to wrap the `serializer`. + */ + val serializer: Seq[NamedExpression] = { +val serializedAsStruct = objSerializer.dataType.isInstanceOf[StructType] +val clsName = Utils.getSimpleName(clsTag.runtimeClass) + +if (serializedAsStruct) { + val nullSafeSerializer = objSerializer.transformUp { +case r: BoundReference => + // For input object of Product type, we can't encode it to row if it's null, as Spark SQL + // doesn't allow top-level row to be null, only its columns can be null. + AssertNotNull(r, Seq("top level Product or row object")) + } + nullSafeSerializer match { +case If(_, _, s: CreateNamedStruct) => s +case s: CreateNamedStruct => s +case _ => + throw new RuntimeException(s"class $clsName has unexpected serializer: $objSerializer") + } +} else { + // For other input objects like primitive, array, map, etc., we construct a struct to wrap + // the serializer which is a column of an row. + CreateNamedStruct(Literal("value") :: objSerializer :: Nil) +} + }.flatten + + /** + * Returns an expression that can be used to deserialize an input row to an object of type `T` + * with a compatible schema. Fields of the row will be extracted using `UnresolvedAttribute`. + * of the same name as the constructor arguments. + * + * For complex objects that are encoded to structs, Fields of the struct will be extracted using + * `GetColumnByOrdinal` with corresponding ordinal. + */ + val deserializer: Expression = { +val serializedAsStruct = objSerializer.dataType.isInstanceOf[StructType] + +if (serializedAsStruct) { + // We serialized this kind of objects to root-level row. The input of general deserializer + // is a `GetColumnByOrdinal(0)` expression to extract first column of a row. We need to + // transform attributes accessors. + objDeserializer.transform { +case UnresolvedExtractValue(GetColumnByOrdinal(0, _), +Literal(part: UTF8String, StringType)) => + UnresolvedAttribute.quoted(part.toString) +case GetStructField(GetColumnByOrdinal(0, dt), ordinal, _) => + GetColumnByOrdinal(ordinal, dt) +case If(IsNull(GetColumnByOrdinal(0, _)), _, n: NewInstance) => n +case If(IsNull(GetColumnByOrdinal(0, _)), _, i: InitializeJavaBean) => i + } +} else { + // For other input objects like primitive, array, map, etc., we deserialize the first column +
[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r226301139 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -212,21 +183,88 @@ object ExpressionEncoder { * A generic encoder for JVM objects that uses Catalyst Expressions for a `serializer` * and a `deserializer`. * - * @param schema The schema after converting `T` to a Spark SQL row. - * @param serializer A set of expressions, one for each top-level field that can be used to - * extract the values from a raw object into an [[InternalRow]]. - * @param deserializer An expression that will construct an object given an [[InternalRow]]. + * @param objSerializer An expression that can be used to encode a raw object to corresponding + * Spark SQL representation that can be a primitive column, array, map or a + * struct. This represents how Spark SQL generally serializes an object of + * type `T`. + * @param objDeserializer An expression that will construct an object given a Spark SQL + *representation. This represents how Spark SQL generally deserializes + *a serialized value in Spark SQL representation back to an object of + *type `T`. * @param clsTag A classtag for `T`. */ case class ExpressionEncoder[T]( -schema: StructType, -flat: Boolean, -serializer: Seq[Expression], -deserializer: Expression, +objSerializer: Expression, +objDeserializer: Expression, clsTag: ClassTag[T]) extends Encoder[T] { - if (flat) require(serializer.size == 1) + /** + * A set of expressions, one for each top-level field that can be used to + * extract the values from a raw object into an [[InternalRow]]: + * 1. If `serializer` encodes a raw object to a struct, we directly use the `serializer`. + * 2. For other cases, we create a struct to wrap the `serializer`. + */ + val serializer: Seq[NamedExpression] = { +val serializedAsStruct = objSerializer.dataType.isInstanceOf[StructType] +val clsName = Utils.getSimpleName(clsTag.runtimeClass) + +if (serializedAsStruct) { + val nullSafeSerializer = objSerializer.transformUp { +case r: BoundReference => + // For input object of Product type, we can't encode it to row if it's null, as Spark SQL + // doesn't allow top-level row to be null, only its columns can be null. + AssertNotNull(r, Seq("top level Product or row object")) + } + nullSafeSerializer match { +case If(_, _, s: CreateNamedStruct) => s +case s: CreateNamedStruct => s +case _ => + throw new RuntimeException(s"class $clsName has unexpected serializer: $objSerializer") + } +} else { + // For other input objects like primitive, array, map, etc., we construct a struct to wrap + // the serializer which is a column of an row. + CreateNamedStruct(Literal("value") :: objSerializer :: Nil) +} + }.flatten + + /** + * Returns an expression that can be used to deserialize an input row to an object of type `T` + * with a compatible schema. Fields of the row will be extracted using `UnresolvedAttribute`. + * of the same name as the constructor arguments. + * + * For complex objects that are encoded to structs, Fields of the struct will be extracted using + * `GetColumnByOrdinal` with corresponding ordinal. + */ + val deserializer: Expression = { +val serializedAsStruct = objSerializer.dataType.isInstanceOf[StructType] + +if (serializedAsStruct) { + // We serialized this kind of objects to root-level row. The input of general deserializer + // is a `GetColumnByOrdinal(0)` expression to extract first column of a row. We need to + // transform attributes accessors. + objDeserializer.transform { +case UnresolvedExtractValue(GetColumnByOrdinal(0, _), +Literal(part: UTF8String, StringType)) => + UnresolvedAttribute.quoted(part.toString) +case GetStructField(GetColumnByOrdinal(0, dt), ordinal, _) => + GetColumnByOrdinal(ordinal, dt) +case If(IsNull(GetColumnByOrdinal(0, _)), _, n: NewInstance) => n +case If(IsNull(GetColumnByOrdinal(0, _)), _, i: InitializeJavaBean) => i + } +} else { + // For other input objects like primitive, array, map, etc., we deserialize the first column +
[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r226299441 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -212,21 +183,88 @@ object ExpressionEncoder { * A generic encoder for JVM objects that uses Catalyst Expressions for a `serializer` * and a `deserializer`. * - * @param schema The schema after converting `T` to a Spark SQL row. - * @param serializer A set of expressions, one for each top-level field that can be used to - * extract the values from a raw object into an [[InternalRow]]. - * @param deserializer An expression that will construct an object given an [[InternalRow]]. + * @param objSerializer An expression that can be used to encode a raw object to corresponding + * Spark SQL representation that can be a primitive column, array, map or a + * struct. This represents how Spark SQL generally serializes an object of + * type `T`. + * @param objDeserializer An expression that will construct an object given a Spark SQL + *representation. This represents how Spark SQL generally deserializes + *a serialized value in Spark SQL representation back to an object of + *type `T`. * @param clsTag A classtag for `T`. */ case class ExpressionEncoder[T]( -schema: StructType, -flat: Boolean, -serializer: Seq[Expression], -deserializer: Expression, +objSerializer: Expression, +objDeserializer: Expression, clsTag: ClassTag[T]) extends Encoder[T] { - if (flat) require(serializer.size == 1) + /** + * A set of expressions, one for each top-level field that can be used to --- End diff -- set -> sequence --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r226298803 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -212,21 +183,88 @@ object ExpressionEncoder { * A generic encoder for JVM objects that uses Catalyst Expressions for a `serializer` * and a `deserializer`. * - * @param schema The schema after converting `T` to a Spark SQL row. - * @param serializer A set of expressions, one for each top-level field that can be used to - * extract the values from a raw object into an [[InternalRow]]. - * @param deserializer An expression that will construct an object given an [[InternalRow]]. + * @param objSerializer An expression that can be used to encode a raw object to corresponding + * Spark SQL representation that can be a primitive column, array, map or a + * struct. This represents how Spark SQL generally serializes an object of + * type `T`. + * @param objDeserializer An expression that will construct an object given a Spark SQL + *representation. This represents how Spark SQL generally deserializes + *a serialized value in Spark SQL representation back to an object of + *type `T`. * @param clsTag A classtag for `T`. */ case class ExpressionEncoder[T]( -schema: StructType, -flat: Boolean, -serializer: Seq[Expression], -deserializer: Expression, +objSerializer: Expression, +objDeserializer: Expression, clsTag: ClassTag[T]) extends Encoder[T] { - if (flat) require(serializer.size == 1) + /** + * A set of expressions, one for each top-level field that can be used to + * extract the values from a raw object into an [[InternalRow]]: + * 1. If `serializer` encodes a raw object to a struct, we directly use the `serializer`. + * 2. For other cases, we create a struct to wrap the `serializer`. + */ + val serializer: Seq[NamedExpression] = { +val serializedAsStruct = objSerializer.dataType.isInstanceOf[StructType] +val clsName = Utils.getSimpleName(clsTag.runtimeClass) + +if (serializedAsStruct) { + val nullSafeSerializer = objSerializer.transformUp { +case r: BoundReference => + // For input object of Product type, we can't encode it to row if it's null, as Spark SQL + // doesn't allow top-level row to be null, only its columns can be null. + AssertNotNull(r, Seq("top level Product or row object")) + } + nullSafeSerializer match { +case If(_, _, s: CreateNamedStruct) => s --- End diff -- let's also make sure the if condition is `IsNull`, which better explains why we strip it(it can't be null) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r226296369 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -103,75 +88,61 @@ object ExpressionEncoder { * name/positional binding is preserved. */ def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = { --- End diff -- cool, this method is simplified a lot with the new abstraction. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r226295859 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -103,75 +88,61 @@ object ExpressionEncoder { * name/positional binding is preserved. */ def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = { +if (encoders.length > 22) { + throw new RuntimeException("Can't construct a tuple encoder for more than 22 encoders.") +} + encoders.foreach(_.assertUnresolved()) val schema = StructType(encoders.zipWithIndex.map { case (e, i) => -val (dataType, nullable) = if (e.flat) { - e.schema.head.dataType -> e.schema.head.nullable -} else { - e.schema -> true -} -StructField(s"_${i + 1}", dataType, nullable) +StructField(s"_${i + 1}", e.objSerializer.dataType, e.objSerializer.nullable) }) val cls = Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}") -val serializer = encoders.zipWithIndex.map { case (enc, index) => - val originalInputObject = enc.serializer.head.collect { case b: BoundReference => b }.head +val serializers = encoders.zipWithIndex.map { case (enc, index) => + val boundRefs = enc.objSerializer.collect { case b: BoundReference => b }.distinct + assert(boundRefs.size == 1, "object serializer should have only one bound reference but " + +s"there are ${boundRefs.size}") + + val originalInputObject = boundRefs.head val newInputObject = Invoke( BoundReference(0, ObjectType(cls), nullable = true), s"_${index + 1}", -originalInputObject.dataType) +originalInputObject.dataType, +returnNullable = originalInputObject.nullable) - val newSerializer = enc.serializer.map(_.transformUp { + val newSerializer = enc.objSerializer.transformUp { case b: BoundReference if b == originalInputObject => newInputObject --- End diff -- Since there is only one distinct `BoundReference`, we can just write `case b: BoundReference => newInputObject` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r226294255 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -103,75 +88,61 @@ object ExpressionEncoder { * name/positional binding is preserved. */ def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = { +if (encoders.length > 22) { --- End diff -- can we do it in a separated PR with a test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r226294017 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -43,10 +44,11 @@ import org.apache.spark.util.Utils *to the name `value`. */ object ExpressionEncoder { + def apply[T : TypeTag](): ExpressionEncoder[T] = { -// We convert the not-serializable TypeTag into StructType and ClassTag. val mirror = ScalaReflection.mirror -val tpe = typeTag[T].in(mirror).tpe +val tpe = ScalaReflection.localTypeOf[T] --- End diff -- why change it from `typeTag[T].in(mirror).tpe`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22721: [SPARK-25403][SQL] Refreshes the table after inserting t...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22721 I think it's reasonable to follow `InsertIntoHiveTable`, but it's better to provide more details about what changes in `InsertIntoHadoopFsRelationCommand`: 1. what's refreshed? Previously we refreshed the data cache via path, and also refresh the file index. But the plan cache is still there. Now we refresh the plan cache. Since file index exists in the plan, so we don't need to refresh it if we refresh plan cache, but the data cache still needs to be refreshed. 2. what's the performance impact? plan cache is very useful when reading partitioned tables, to avoid listing files repeatedly. But seems it's OK because we already refresh file index before, so we must re-list files after insertion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22721: [SPARK-25403][SQL] Refreshes the table after inse...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22721#discussion_r226280121 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala --- @@ -189,6 +189,7 @@ case class InsertIntoHadoopFsRelationCommand( sparkSession.catalog.refreshByPath(outputPath.toString) if (catalogTable.nonEmpty) { + sparkSession.catalog.refreshTable(catalogTable.get.identifier.quotedString) --- End diff -- shall we follow `InsertIntoHiveTable` and use `sparkSession.sessionState.catalog.refreshTable`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22756: [SPARK-25758][ML] Deprecate computeCost on BisectingKMea...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22756 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22547 Hi @rdblue welcome back! I just rebased it so it's ready for review :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22721: [SPARK-25403][SQL] Refreshes the table after inse...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22721#discussion_r226208576 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala --- @@ -189,6 +189,7 @@ case class InsertIntoHadoopFsRelationCommand( sparkSession.catalog.refreshByPath(outputPath.toString) if (catalogTable.nonEmpty) { + sparkSession.catalog.refreshTable(catalogTable.get.identifier.quotedString) --- End diff -- shall we do ``` if (catalogTable.isDefined) { sparkSession.catalog.refreshTable(catalogTable.get.identifier.quotedString) } else { // refresh cached files in FileIndex fileIndex.foreach(_.refresh()) // refresh data cache if table is cached sparkSession.catalog.refreshByPath(outputPath.toString) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22758#discussion_r226198591 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala --- @@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log None) val logicalRelation = cached.getOrElse { val updatedTable = inferIfNeeded(relation, options, fileFormat) + // Intialize the catalogTable stats if its not defined.An intial value has to be defined --- End diff -- > but after create table command, when we do insert command within the same session Hive statistics is not getting updated This is the thing I don't understand. Like I said before, even if table has no stats, Spark will still get a stats via the `DetermineTableStats` rule. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r226156536 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala --- @@ -393,4 +393,30 @@ class UDFSuite extends QueryTest with SharedSQLContext { checkAnswer(df, Seq(Row("12"), Row("24"), Row("3null"), Row(null))) } } + + test("SPARK-25044 Verify null input handling for primitive types - with udf()") { +val udf1 = udf({(x: Long, y: Any) => x * 2 + (if (y == null) 1 else 0)}) +val df = spark.range(0, 3).toDF("a") + .withColumn("b", udf1($"a", lit(null))) + .withColumn("c", udf1(lit(null), $"a")) + +checkAnswer( + df, + Seq( +Row(0, 1, null), +Row(1, 3, null), +Row(2, 5, null))) + } + + test("SPARK-25044 Verify null input handling for primitive types - with udf.register") { +withTable("t") { --- End diff -- ah we shuold do `(null, new Integer(1), "x"), ("N", null, "y"), ("N", new Integer(3), null)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r226156400 --- Diff: docs/sql-programming-guide.md --- @@ -1978,6 +1978,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - Since Spark 2.4, empty strings are saved as quoted empty strings `""`. In version 2.3 and earlier, empty strings are equal to `null` values and do not reflect to any characters in saved CSV files. For example, the row of `"a", null, "", 1` was writted as `a,,,1`. Since Spark 2.4, the same row is saved as `a,,"",1`. To restore the previous behavior, set the CSV option `emptyValue` to empty (not quoted) string. - Since Spark 2.4, The LOAD DATA command supports wildcard `?` and `*`, which match any one character, and zero or more characters, respectively. Example: `LOAD DATA INPATH '/tmp/folder*/'` or `LOAD DATA INPATH '/tmp/part-?'`. Special Characters like `space` also now work in paths. Example: `LOAD DATA INPATH '/tmp/folder name/'`. - In Spark version 2.3 and earlier, HAVING without GROUP BY is treated as WHERE. This means, `SELECT 1 FROM range(10) HAVING true` is executed as `SELECT 1 FROM range(10) WHERE true` and returns 10 rows. This violates SQL standard, and has been fixed in Spark 2.4. Since Spark 2.4, HAVING without GROUP BY is treated as a global aggregate, which means `SELECT 1 FROM range(10) HAVING true` will return only one row. To restore the previous behavior, set `spark.sql.legacy.parser.havingWithoutGroupByAsWhere` to `true`. + - Since Spark 2.4, use of the method `def udf(f: AnyRef, dataType: DataType): UserDefinedFunction` should not expect any automatic null handling of the input parameters, thus a null input of a Scala primitive type will be converted to the type's corresponding default value in the UDF. All other UDF declaration and registration methods remain the same behavior as before. --- End diff -- yes, this fixes a bug introduced by a commit in 2.4 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org