[GitHub] spark issue #22827: [SPARK-25832][SQL][BRANCH-2.4] Revert newly added map re...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22827
  
thanks, merging to 2.4!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22029: [SPARK-24395][SQL] IN operator should return NULL...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22029#discussion_r228245697
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 ---
@@ -202,7 +209,11 @@ case class InSubquery(values: Seq[Expression], query: 
ListQuery)
  */
 // scalastyle:off line.size.limit
 @ExpressionDescription(
-  usage = "expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals 
to any valN.",
+  usage = """
+expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals to any 
valN. Otherwise, if
+  spark.sql.legacy.inOperator.falseForNullField is false and any of 
the elements or fields of
+  the elements is null it returns null, else it returns false.
--- End diff --

I vaguely remember that multi-line string doesn't work with 
`ExpressionDescription`. Can you verify it with `DESCRIBE FUNCTION`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22814: [SPARK-25819][SQL] Support parse mode option for the fun...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22814
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22812: [SPARK-25817][SQL] Dataset encoder should support combin...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22812
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22809: [SPARK-19851][SQL] Add support for EVERY and ANY ...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22809#discussion_r228238508
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Max.scala
 ---
@@ -57,3 +57,34 @@ case class Max(child: Expression) extends 
DeclarativeAggregate {
 
   override lazy val evaluateExpression: AttributeReference = max
 }
+
+abstract class AnyAggBase(arg: Expression)
--- End diff --

If it's hard to decide where to put it, I think putting it in a new file 
can be considered.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22809: [SPARK-19851][SQL] Add support for EVERY and ANY ...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22809#discussion_r228238276
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Max.scala
 ---
@@ -57,3 +57,34 @@ case class Max(child: Expression) extends 
DeclarativeAggregate {
 
   override lazy val evaluateExpression: AttributeReference = max
 }
+
+abstract class AnyAggBase(arg: Expression)
+  extends UnevaluableAggrgate with ImplicitCastInputTypes {
+
+  override def children: Seq[Expression] = arg :: Nil
+
+  override def dataType: DataType = BooleanType
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(BooleanType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+arg.dataType match {
+  case dt if dt != BooleanType =>
+TypeCheckResult.TypeCheckFailure(s"Input to function '$prettyName' 
should have been " +
+  s"${BooleanType.simpleString}, but it's 
[${arg.dataType.catalogString}].")
+  case _ => TypeCheckResult.TypeCheckSuccess
+}
+  }
+}
+
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns true if at least one value of `expr` is 
true.")
--- End diff --

Ideally we need `Since` here. Some functions don't have them because at 
that time the `Since` method was not there. We should add missing `Since` to 
them as well, if other people have time to do it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22675: [SPARK-25347][ML][DOC] Spark datasource for image/libsvm...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22675
  
thanks, merging to master/2.4!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22790: [SPARK-25793][ML]call SaveLoadV2_0.load for classNameV2_...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22790
  
Is this ready to go? We are going to have another RC, and would be good to 
include it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22775: [SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json's inpu...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22775
  
I think I'm not qualified to make the decision here, as I don't fully 
understand the use case.

It looks to me that one use case would be to run `schema_of_json` on a 
column and manually figure out what's the final schema, and then use this 
schema in `from_json`. If `schema_of_string` only accepts literal, I'm not how 
users would use it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22825: [SPARK-25772][SQL][FOLLOWUP] remove GetArrayFromMap

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22825
  
cc @vofque  @viirya 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22825: [SPARK-25772][SQL][FOLLOWUP] remove GetArrayFromM...

2018-10-25 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/22825

[SPARK-25772][SQL][FOLLOWUP] remove GetArrayFromMap

## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/22745 we introduced the 
`GetArrayFromMap` expression. Later on I realized this is duplicated as we 
already have `MapKeys` and `MapValues`.

This PR removes `GetArrayFromMap`

## How was this patch tested?

existing tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark minor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22825.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22825


commit a6c6faa60d8846fb50845839905fc0b938046e02
Author: Wenchen Fan 
Date:   2018-10-25T14:29:50Z

remove GetArrayFromMap




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22812: [SPARK-25817][SQL] Dataset encoder should support...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22812#discussion_r228195016
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -119,10 +119,9 @@ object ExpressionEncoder {
 }
 
 val childrenDeserializers = encoders.zipWithIndex.map { case (enc, 
index) =>
-  val getColumnsByOrdinals = enc.objDeserializer.collect { case c: 
GetColumnByOrdinal => c }
-.distinct
-  assert(getColumnsByOrdinals.size == 1, "object deserializer should 
have only one " +
-s"`GetColumnByOrdinal`, but there are 
${getColumnsByOrdinals.size}")
+  val getColExprs = enc.objDeserializer.collect { case c: 
GetColumnByOrdinal => c }.distinct
--- End diff --

unrelated but to fix minor code style issues in 
https://github.com/apache/spark/pull/22749


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22821: [SPARK-25832][SQL] remove newly added map related functi...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22821
  
@dongjoon-hyun if there are advanced users who know all the background, and 
still want to use these functions, why shall we stop them? If end users can't 
hit the bug with public APIs, I think we are fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22814#discussion_r228164819
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
 ---
@@ -167,4 +184,26 @@ class AvroCatalystDataConversionSuite extends 
SparkFunSuite with ExpressionEvalH
 // avro reader reads the first 4 bytes of a double as a float, the 
result is totally undefined.
 checkResult(data, avroTypeJson, 5.848603E35f)
   }
+
+  test("Handle unsupported input of record type") {
+val actualSchema = StructType(Seq(
+  StructField("col_0", StringType, false),
+  StructField("col_1", ShortType, false),
+  StructField("col_2", DecimalType(8, 4), false),
+  StructField("col_3", BooleanType, true),
+  StructField("col_4", DecimalType(38, 38), false)))
+
+val expectedSchema = StructType(Seq(
+  StructField("col_0", BinaryType, false),
+  StructField("col_1", DoubleType, false),
+  StructField("col_2", DecimalType(18, 4), false),
+  StructField("col_3", StringType, true),
+  StructField("col_4", DecimalType(38, 38), false)))
+
+val data = RandomDataGenerator.randomRow(new scala.util.Random, 
actualSchema)
--- End diff --

let's include the seed with `withClue`, so that people can reproduce test 
failures


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22814#discussion_r228164087
  
--- Diff: docs/sql-migration-guide-upgrade.md ---
@@ -11,6 +11,10 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In PySpark, when creating a `SparkSession` with 
`SparkSession.builder.getOrCreate()`, if there is an existing `SparkContext`, 
the builder was trying to update the `SparkConf` of the existing `SparkContext` 
with configurations specified to the builder, but the `SparkContext` is shared 
by all `SparkSession`s, so we should not update them. Since 3.0, the builder 
comes to not update the configurations. This is the same behavior as Java/Scala 
API in 2.3 and above. If you want to update them, you need to update them prior 
to creating a `SparkSession`.
 
+  - In Avro data source, the function `from_avro` supports following parse 
modes:
+* `PERMISSIVE`: Corrupt records are processed as null result. To 
implement this, the data schema is forced to be fully nullable, which might be 
different from the one user provided. This is the default mode.
+* `FAILFAST`: Throws an exception on processing corrupted record.
--- End diff --

Let's explain what changes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22814#discussion_r228162464
  
--- Diff: docs/sql-migration-guide-upgrade.md ---
@@ -11,6 +11,10 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In PySpark, when creating a `SparkSession` with 
`SparkSession.builder.getOrCreate()`, if there is an existing `SparkContext`, 
the builder was trying to update the `SparkConf` of the existing `SparkContext` 
with configurations specified to the builder, but the `SparkContext` is shared 
by all `SparkSession`s, so we should not update them. Since 3.0, the builder 
comes to not update the configurations. This is the same behavior as Java/Scala 
API in 2.3 and above. If you want to update them, you need to update them prior 
to creating a `SparkSession`.
 
+  - In Avro data source, the function `from_avro` supports following parse 
modes:
+* `PERMISSIVE`: Corrupt records are processed as null result. To 
implement this, the data schema is forced to be fully nullable, which might be 
different from the one user provided. This is the default mode.
+* `FAILFAST`: Throws an exception on processing corrupted record.
--- End diff --

We don't change existing APIs but add a new `from_avro` method to take an 
extra `option` parameter. Users won't hit any problem when upgrading Spark, and 
ideally they should read release notes and use this new feature if they need. I 
don't think we need to put it in migration guide.

Let's not abuse the migration guide.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22814#discussion_r228160490
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala 
---
@@ -61,6 +59,24 @@ class AvroFunctionsSuite extends QueryTest with 
SharedSQLContext {
 checkAnswer(avroStructDF.select(from_avro('avro, avroTypeStruct)), df)
   }
 
+  test("handle invalid input in from_avro") {
+val count = 10
+val df = spark.range(count).select(struct('id, 
'id.as("id2")).as("struct"))
+val avroStructDF = df.select(to_avro('struct).as("avro"))
+val avroTypeStruct = s"""
+  |{
+  |  "type": "record",
+  |  "name": "struct",
+  |  "fields": [
+  |{"name": "col1", "type": "long"},
+  |{"name": "col2", "type": "double"}
+  |  ]
+  |}
+""".stripMargin
+val expected = (0 until count).map(_ => Row(Row(null, null)))
+checkAnswer(avroStructDF.select(from_avro('avro, avroTypeStruct)), 
expected)
--- End diff --

> Also, one good thing about PERMISSIVE mode is that we allow to fill 
invalid records at columnNameOfCorruptRecord

yea Avro can't do it. But returning null instead of failing should still be 
a good thing for `from_avro`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder to get ...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22749
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r228134985
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -212,21 +181,91 @@ object ExpressionEncoder {
  * A generic encoder for JVM objects that uses Catalyst Expressions for a 
`serializer`
  * and a `deserializer`.
  *
- * @param schema The schema after converting `T` to a Spark SQL row.
- * @param serializer A set of expressions, one for each top-level field 
that can be used to
- *   extract the values from a raw object into an 
[[InternalRow]].
- * @param deserializer An expression that will construct an object given 
an [[InternalRow]].
+ * @param objSerializer An expression that can be used to encode a raw 
object to corresponding
+ *   Spark SQL representation that can be a primitive 
column, array, map or a
+ *   struct. This represents how Spark SQL generally 
serializes an object of
+ *   type `T`.
+ * @param objDeserializer An expression that will construct an object 
given a Spark SQL
+ *representation. This represents how Spark SQL 
generally deserializes
+ *a serialized value in Spark SQL representation 
back to an object of
+ *type `T`.
  * @param clsTag A classtag for `T`.
  */
 case class ExpressionEncoder[T](
-schema: StructType,
-flat: Boolean,
-serializer: Seq[Expression],
-deserializer: Expression,
+objSerializer: Expression,
+objDeserializer: Expression,
 clsTag: ClassTag[T])
   extends Encoder[T] {
 
-  if (flat) require(serializer.size == 1)
+  /**
+   * A sequence of expressions, one for each top-level field that can be 
used to
+   * extract the values from a raw object into an [[InternalRow]]:
+   * 1. If `serializer` encodes a raw object to a struct, strip the outer 
If-IsNull and get
+   *the `CreateNamedStruct`.
+   * 2. For other cases, wrap the single serializer with 
`CreateNamedStruct`.
+   */
+  val serializer: Seq[NamedExpression] = {
+val clsName = Utils.getSimpleName(clsTag.runtimeClass)
+
+if (isSerializedAsStruct) {
+  val nullSafeSerializer = objSerializer.transformUp {
+case r: BoundReference =>
+  // For input object of Product type, we can't encode it to row 
if it's null, as Spark SQL
+  // doesn't allow top-level row to be null, only its columns can 
be null.
+  AssertNotNull(r, Seq("top level Product or row object"))
+  }
+  nullSafeSerializer match {
+case If(_: IsNull, _, s: CreateNamedStruct) => s
+case s: CreateNamedStruct => s
--- End diff --

this is minor, we can update it in another PR. We don't need to wait for 
another jenkins QA round.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r228132840
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -212,21 +181,91 @@ object ExpressionEncoder {
  * A generic encoder for JVM objects that uses Catalyst Expressions for a 
`serializer`
  * and a `deserializer`.
  *
- * @param schema The schema after converting `T` to a Spark SQL row.
- * @param serializer A set of expressions, one for each top-level field 
that can be used to
- *   extract the values from a raw object into an 
[[InternalRow]].
- * @param deserializer An expression that will construct an object given 
an [[InternalRow]].
+ * @param objSerializer An expression that can be used to encode a raw 
object to corresponding
+ *   Spark SQL representation that can be a primitive 
column, array, map or a
+ *   struct. This represents how Spark SQL generally 
serializes an object of
+ *   type `T`.
+ * @param objDeserializer An expression that will construct an object 
given a Spark SQL
+ *representation. This represents how Spark SQL 
generally deserializes
+ *a serialized value in Spark SQL representation 
back to an object of
+ *type `T`.
  * @param clsTag A classtag for `T`.
  */
 case class ExpressionEncoder[T](
-schema: StructType,
-flat: Boolean,
-serializer: Seq[Expression],
-deserializer: Expression,
+objSerializer: Expression,
+objDeserializer: Expression,
 clsTag: ClassTag[T])
   extends Encoder[T] {
 
-  if (flat) require(serializer.size == 1)
+  /**
+   * A sequence of expressions, one for each top-level field that can be 
used to
+   * extract the values from a raw object into an [[InternalRow]]:
+   * 1. If `serializer` encodes a raw object to a struct, strip the outer 
If-IsNull and get
+   *the `CreateNamedStruct`.
+   * 2. For other cases, wrap the single serializer with 
`CreateNamedStruct`.
+   */
+  val serializer: Seq[NamedExpression] = {
+val clsName = Utils.getSimpleName(clsTag.runtimeClass)
+
+if (isSerializedAsStruct) {
+  val nullSafeSerializer = objSerializer.transformUp {
+case r: BoundReference =>
+  // For input object of Product type, we can't encode it to row 
if it's null, as Spark SQL
+  // doesn't allow top-level row to be null, only its columns can 
be null.
+  AssertNotNull(r, Seq("top level Product or row object"))
+  }
+  nullSafeSerializer match {
+case If(_: IsNull, _, s: CreateNamedStruct) => s
+case s: CreateNamedStruct => s
--- End diff --

when will we hit this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22621
  
why do we need migration guide for bug fix?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22814#discussion_r228063724
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
 ---
@@ -17,30 +17,49 @@
 
 package org.apache.spark.sql.avro
 
+import org.apache.avro.Schema
+
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.RandomDataGenerator
+import org.apache.spark.sql.{RandomDataGenerator, Row}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, 
GenericInternalRow, Literal}
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
GenericArrayData, MapData}
+import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 
-class AvroCatalystDataConversionSuite extends SparkFunSuite with 
ExpressionEvalHelper {
+class AvroCatalystDataConversionSuite extends SparkFunSuite
+  with SharedSQLContext
+  with ExpressionEvalHelper {
 
   private def roundTripTest(data: Literal): Unit = {
 val avroType = SchemaConverters.toAvroType(data.dataType, 
data.nullable)
 checkResult(data, avroType.toString, data.eval())
   }
 
   private def checkResult(data: Literal, schema: String, expected: Any): 
Unit = {
-checkEvaluation(
-  AvroDataToCatalyst(CatalystDataToAvro(data), schema),
-  prepareExpectedResult(expected))
+Seq("FAILFAST", "PERMISSIVE").foreach { mode =>
--- End diff --

why do we test 2 modes here? The mode is for when failure happens, but 
`checkResult` is for when no failures.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22814#discussion_r228063256
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala ---
@@ -79,4 +80,23 @@ class AvroOptions(
   val compression: String = {
 
parameters.get("compression").getOrElse(SQLConf.get.avroCompressionCodec)
   }
+
+  @transient private val acceptableParseMode = Seq(PermissiveMode, 
FailFastMode)
--- End diff --

do we need to check mode here? The `AvroOptions` can be used with reading 
avro files, and DROP MALFORMED can be supported.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22814#discussion_r228062884
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala ---
@@ -31,10 +32,32 @@ package object avro {
* @since 2.4.0
*/
   @Experimental
-  def from_avro(data: Column, jsonFormatSchema: String): Column = {
-new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema))
+  def from_avro(
+  data: Column,
+  jsonFormatSchema: String): Column = {
+new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema, Map.empty))
   }
 
+  /**
+   * Converts a binary column of avro format into its corresponding 
catalyst value. The specified
+   * schema must match the read data, otherwise the behavior is undefined: 
it may fail or return
+   * arbitrary result.
+   *
+   * @param data the binary column.
+   * @param jsonFormatSchema the avro schema in JSON string format.
+   * @param options options to control how the Avro record is parsed.
+   *
+   * @since 2.4.0
--- End diff --

3.0.0


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22814#discussion_r228062545
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala 
---
@@ -44,24 +59,74 @@ case class AvroDataToCatalyst(child: Expression, 
jsonFormatSchema: String)
 
   @transient private var result: Any = _
 
+  @transient private lazy val parseMode: ParseMode = {
+val mode = AvroOptions(options).parseMode
+if (mode != PermissiveMode && mode != FailFastMode) {
+  throw new AnalysisException(unacceptableModeMessage(mode.name))
+}
+mode
+  }
+
+  private def unacceptableModeMessage(name: String): String = {
+s"from_avro() doesn't support the $name mode. " +
+  s"Acceptable modes are ${PermissiveMode.name} and 
${FailFastMode.name}."
+  }
+
+  @transient private lazy val nullResultRow: Any = dataType match {
+  case st: StructType =>
+val resultRow = new SpecificInternalRow(st.map(_.dataType))
+for(i <- 0 until st.length) {
+  resultRow.setNullAt(i)
+}
+resultRow
+
+  case _ =>
+null
+}
+
+
   override def nullSafeEval(input: Any): Any = {
 val binary = input.asInstanceOf[Array[Byte]]
-decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, 
decoder)
-result = reader.read(result, decoder)
-deserializer.deserialize(result)
+try {
+  decoder = DecoderFactory.get().binaryDecoder(binary, 0, 
binary.length, decoder)
+  result = reader.read(result, decoder)
+  deserializer.deserialize(result)
+} catch {
+  // There could be multiple possible exceptions here, e.g. 
java.io.IOException,
+  // AvroRuntimeException, ArrayIndexOutOfBoundsException, etc.
+  // To make it simple, catch all the exceptions here.
+  case e: Exception => parseMode match {
+case PermissiveMode => nullResultRow
+case FailFastMode =>
+  throw new SparkException("Malformed records are detected in 
record parsing. " +
+s"Parse Mode: ${FailFastMode.name}.", e.getCause)
+case _ =>
+  throw new 
AnalysisException(unacceptableModeMessage(parseMode.name))
+  }
+}
   }
 
   override def simpleString: String = {
-s"from_avro(${child.sql}, ${dataType.simpleString})"
+s"from_avro(${child.sql}, ${dataType.simpleString}, 
${options.toString()})"
   }
 
   override def sql: String = {
-s"from_avro(${child.sql}, ${dataType.catalogString})"
+s"from_avro(${child.sql}, ${dataType.catalogString}, 
${options.toString()})"
   }
 
   override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
 val expr = ctx.addReferenceObj("this", this)
-defineCodeGen(ctx, ev, input =>
-  s"(${CodeGenerator.boxedType(dataType)})$expr.nullSafeEval($input)")
+nullSafeCodeGen(ctx, ev, eval => {
+  val result = ctx.freshName("tempResult")
+  s"""
+${CodeGenerator.boxedType(dataType)} $result =
--- End diff --

nit: maybe
```
val dt = CodeGenerator.boxedType(dataType)

dt $result = ($dt) ...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22814#discussion_r228061155
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala 
---
@@ -44,24 +59,74 @@ case class AvroDataToCatalyst(child: Expression, 
jsonFormatSchema: String)
 
   @transient private var result: Any = _
 
+  @transient private lazy val parseMode: ParseMode = {
+val mode = AvroOptions(options).parseMode
+if (mode != PermissiveMode && mode != FailFastMode) {
+  throw new AnalysisException(unacceptableModeMessage(mode.name))
+}
+mode
+  }
+
+  private def unacceptableModeMessage(name: String): String = {
+s"from_avro() doesn't support the $name mode. " +
+  s"Acceptable modes are ${PermissiveMode.name} and 
${FailFastMode.name}."
+  }
+
+  @transient private lazy val nullResultRow: Any = dataType match {
+  case st: StructType =>
+val resultRow = new SpecificInternalRow(st.map(_.dataType))
+for(i <- 0 until st.length) {
+  resultRow.setNullAt(i)
+}
+resultRow
+
+  case _ =>
+null
+}
+
+
   override def nullSafeEval(input: Any): Any = {
 val binary = input.asInstanceOf[Array[Byte]]
-decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, 
decoder)
-result = reader.read(result, decoder)
-deserializer.deserialize(result)
+try {
+  decoder = DecoderFactory.get().binaryDecoder(binary, 0, 
binary.length, decoder)
+  result = reader.read(result, decoder)
+  deserializer.deserialize(result)
+} catch {
+  // There could be multiple possible exceptions here, e.g. 
java.io.IOException,
+  // AvroRuntimeException, ArrayIndexOutOfBoundsException, etc.
+  // To make it simple, catch all the exceptions here.
+  case e: Exception => parseMode match {
--- End diff --

we should catch `NonFatal` to be safer


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22814#discussion_r228059940
  
--- Diff: docs/sql-data-sources-avro.md ---
@@ -177,6 +180,18 @@ Data source options of Avro can be set using the 
`.option` method on `DataFrameR
   Currently supported codecs are uncompressed, 
snappy, deflate, bzip2 and 
xz. If the option is not set, the configuration 
spark.sql.avro.compression.codec config is taken into account.
 write
   
+  
+mode
+PERMISSIVE
+The mode option allows to specify parse mode for 
function from_avro.
+  Currently supported modes are:
+  
+PERMISSIVE: Corrupt records are processed as null 
result.
--- End diff --

not null result, but a row with all columns null.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22821: [SPARK-25832][SQL] remove newly added map related functi...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22821
  
cc @dongjoon-hyun @gatorsmile @rxin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22821: [SPARK-25832][] remove newly added map related fu...

2018-10-24 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/22821

[SPARK-25832][] remove newly added map related functions from 
FunctionRegistry

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark revert

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22821.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22821


commit 7e919e3efa3d669a6893b47c737e553795d94347
Author: Wenchen Fan 
Date:   2018-10-25T04:42:17Z

remove newly added map related functions from FunctionRegistry




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22773: [SPARK-25785][SQL] Add prettyNames for from_json, to_jso...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22773
  
I think the new names are better and expected, though it's safer to mention 
it in the migration guide in case some users care about it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22755: [SPARK-25755][SQL][Test] Supplementation of non-CodeGen ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22755
  
+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r228013784
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -45,6 +46,11 @@ case class CreateHiveTableAsSelectCommand(
 
   override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] 
= {
 val catalog = sparkSession.sessionState.catalog
+val metastoreCatalog = 
catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
+
+// Whether this table is convertible to data source relation.
+val isConvertible = metastoreCatalog.isConvertible(tableDesc)
--- End diff --

ah makes sense, thanks for trying!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22809: [SPARK-19851][SQL] Add support for EVERY and ANY (SOME) ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22809
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22809: [SPARK-19851][SQL] Add support for EVERY and ANY ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22809#discussion_r228013503
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala ---
@@ -727,4 +728,67 @@ class DataFrameAggregateSuite extends QueryTest with 
SharedSQLContext {
   "grouping expressions: [current_date(None)], value: [key: int, 
value: string], " +
 "type: GroupBy]"))
   }
+
+  def getEveryAggColumn(columnName: String): Column = {
+Column(new 
EveryAgg(Column(columnName).expr).toAggregateExpression(false))
--- End diff --

Since we don't have APIs for them in `functions`, it's not likely users 
will use then with DataFrame. Thus I think we don't need these tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22809: [SPARK-19851][SQL] Add support for EVERY and ANY (SOME) ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22809
  
Can we use these functions in window with this approach?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22809: [SPARK-19851][SQL] Add support for EVERY and ANY ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22809#discussion_r227899342
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
 ---
@@ -38,6 +39,18 @@ object ReplaceExpressions extends Rule[LogicalPlan] {
   }
 }
 
+/**
--- End diff --

ah this sounds better!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22144
  
Maybe we should discuss it in another thread, I think we should officially 
write down the procedure about how to evaluate a bug report and label it as a 
blocker or not. Currently https://spark.apache.org/contributing.html only says 
that data correctness issues should be blockers, which can't cover all the 
cases(like this one). It's also inefficient if we require a PMC vote for every 
issue to decide if it's a blocker or not.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227867735
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
 ---
@@ -280,59 +281,59 @@ class ScalaReflectionSuite extends SparkFunSuite {
 
   test("serialize and deserialize arbitrary sequence types") {
 import scala.collection.immutable.Queue
-val queueSerializer = serializerFor[Queue[Int]](BoundReference(
-  0, ObjectType(classOf[Queue[Int]]), nullable = false))
-assert(queueSerializer.dataType.head.dataType ==
+val queueSerializer = 
serializerForType(ScalaReflection.localTypeOf[Queue[Int]])
+assert(queueSerializer.dataType ==
   ArrayType(IntegerType, containsNull = false))
 val queueDeserializer = deserializerFor[Queue[Int]]
 assert(queueDeserializer.dataType == ObjectType(classOf[Queue[_]]))
 
 import scala.collection.mutable.ArrayBuffer
-val arrayBufferSerializer = 
serializerFor[ArrayBuffer[Int]](BoundReference(
-  0, ObjectType(classOf[ArrayBuffer[Int]]), nullable = false))
-assert(arrayBufferSerializer.dataType.head.dataType ==
+val arrayBufferSerializer = 
serializerForType(ScalaReflection.localTypeOf[ArrayBuffer[Int]])
+assert(arrayBufferSerializer.dataType ==
   ArrayType(IntegerType, containsNull = false))
 val arrayBufferDeserializer = deserializerFor[ArrayBuffer[Int]]
 assert(arrayBufferDeserializer.dataType == 
ObjectType(classOf[ArrayBuffer[_]]))
   }
 
   test("serialize and deserialize arbitrary map types") {
-val mapSerializer = serializerFor[Map[Int, Int]](BoundReference(
-  0, ObjectType(classOf[Map[Int, Int]]), nullable = false))
-assert(mapSerializer.dataType.head.dataType ==
+val mapSerializer = 
serializerForType(ScalaReflection.localTypeOf[Map[Int, Int]])
+assert(mapSerializer.dataType ==
   MapType(IntegerType, IntegerType, valueContainsNull = false))
 val mapDeserializer = deserializerFor[Map[Int, Int]]
 assert(mapDeserializer.dataType == ObjectType(classOf[Map[_, _]]))
 
 import scala.collection.immutable.HashMap
-val hashMapSerializer = serializerFor[HashMap[Int, 
Int]](BoundReference(
-  0, ObjectType(classOf[HashMap[Int, Int]]), nullable = false))
-assert(hashMapSerializer.dataType.head.dataType ==
+val hashMapSerializer = 
serializerForType(ScalaReflection.localTypeOf[HashMap[Int, Int]])
+assert(hashMapSerializer.dataType ==
   MapType(IntegerType, IntegerType, valueContainsNull = false))
 val hashMapDeserializer = deserializerFor[HashMap[Int, Int]]
 assert(hashMapDeserializer.dataType == ObjectType(classOf[HashMap[_, 
_]]))
 
 import scala.collection.mutable.{LinkedHashMap => LHMap}
-val linkedHashMapSerializer = serializerFor[LHMap[Long, 
String]](BoundReference(
-  0, ObjectType(classOf[LHMap[Long, String]]), nullable = false))
-assert(linkedHashMapSerializer.dataType.head.dataType ==
+val linkedHashMapSerializer = serializerForType(
+ScalaReflection.localTypeOf[LHMap[Long, String]])
+assert(linkedHashMapSerializer.dataType ==
   MapType(LongType, StringType, valueContainsNull = true))
 val linkedHashMapDeserializer = deserializerFor[LHMap[Long, String]]
 assert(linkedHashMapDeserializer.dataType == 
ObjectType(classOf[LHMap[_, _]]))
   }
 
   test("SPARK-22442: Generate correct field names for special characters") 
{
-val serializer = serializerFor[SpecialCharAsFieldData](BoundReference(
-  0, ObjectType(classOf[SpecialCharAsFieldData]), nullable = false))
+val serializer = 
serializerForType(ScalaReflection.localTypeOf[SpecialCharAsFieldData])
--- End diff --

like `deserializerFor` in this suite, let's also create a `serializerFor`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22144
  
@tgravescs please quote my full comment instead of part of it.

> After all, this is a bug and a regression from previous releases, like 
other 1000 we've fixed before.

The point I was making there is, this issue is not the ones that HAVE TO 
block a release, like correctness issue. I immediately list the reasons 
afterward why I don't think it's a blocker.

> hive compatibility is not that important to Spark at this point

I'm sorry if this worries you. It's true that we focus more on Spark itself 
instead of Hive compatibility in the recent development, but this should not be 
applied to existing Hive compatibility features in Spark and we should still 
maintain them.

BTW, I removed the `supportPartial` flag because no aggregate functions in 
Spark need it(including the adapted Hive UDAF), but the problem exists in how 
to adapt Hive UDAF, which was introduced by  
https://issues.apache.org/jira/browse/SPARK-18186


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22730: [SPARK-16775][CORE] Remove deprecated accumulator v1 API...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22730
  
A late LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227783724
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
 ---
@@ -280,59 +281,59 @@ class ScalaReflectionSuite extends SparkFunSuite {
 
   test("serialize and deserialize arbitrary sequence types") {
 import scala.collection.immutable.Queue
-val queueSerializer = serializerFor[Queue[Int]](BoundReference(
-  0, ObjectType(classOf[Queue[Int]]), nullable = false))
-assert(queueSerializer.dataType.head.dataType ==
+val queueSerializer = 
serializerForType(ScalaReflection.localTypeOf[Queue[Int]])
+assert(queueSerializer.dataType ==
   ArrayType(IntegerType, containsNull = false))
 val queueDeserializer = deserializerFor[Queue[Int]]
 assert(queueDeserializer.dataType == ObjectType(classOf[Queue[_]]))
 
 import scala.collection.mutable.ArrayBuffer
-val arrayBufferSerializer = 
serializerFor[ArrayBuffer[Int]](BoundReference(
-  0, ObjectType(classOf[ArrayBuffer[Int]]), nullable = false))
-assert(arrayBufferSerializer.dataType.head.dataType ==
+val arrayBufferSerializer = 
serializerForType(ScalaReflection.localTypeOf[ArrayBuffer[Int]])
+assert(arrayBufferSerializer.dataType ==
   ArrayType(IntegerType, containsNull = false))
 val arrayBufferDeserializer = deserializerFor[ArrayBuffer[Int]]
 assert(arrayBufferDeserializer.dataType == 
ObjectType(classOf[ArrayBuffer[_]]))
   }
 
   test("serialize and deserialize arbitrary map types") {
-val mapSerializer = serializerFor[Map[Int, Int]](BoundReference(
-  0, ObjectType(classOf[Map[Int, Int]]), nullable = false))
-assert(mapSerializer.dataType.head.dataType ==
+val mapSerializer = 
serializerForType(ScalaReflection.localTypeOf[Map[Int, Int]])
+assert(mapSerializer.dataType ==
   MapType(IntegerType, IntegerType, valueContainsNull = false))
 val mapDeserializer = deserializerFor[Map[Int, Int]]
 assert(mapDeserializer.dataType == ObjectType(classOf[Map[_, _]]))
 
 import scala.collection.immutable.HashMap
-val hashMapSerializer = serializerFor[HashMap[Int, 
Int]](BoundReference(
-  0, ObjectType(classOf[HashMap[Int, Int]]), nullable = false))
-assert(hashMapSerializer.dataType.head.dataType ==
+val hashMapSerializer = 
serializerForType(ScalaReflection.localTypeOf[HashMap[Int, Int]])
+assert(hashMapSerializer.dataType ==
   MapType(IntegerType, IntegerType, valueContainsNull = false))
 val hashMapDeserializer = deserializerFor[HashMap[Int, Int]]
 assert(hashMapDeserializer.dataType == ObjectType(classOf[HashMap[_, 
_]]))
 
 import scala.collection.mutable.{LinkedHashMap => LHMap}
-val linkedHashMapSerializer = serializerFor[LHMap[Long, 
String]](BoundReference(
-  0, ObjectType(classOf[LHMap[Long, String]]), nullable = false))
-assert(linkedHashMapSerializer.dataType.head.dataType ==
+val linkedHashMapSerializer = serializerForType(
+ScalaReflection.localTypeOf[LHMap[Long, String]])
+assert(linkedHashMapSerializer.dataType ==
   MapType(LongType, StringType, valueContainsNull = true))
 val linkedHashMapDeserializer = deserializerFor[LHMap[Long, String]]
 assert(linkedHashMapDeserializer.dataType == 
ObjectType(classOf[LHMap[_, _]]))
   }
 
   test("SPARK-22442: Generate correct field names for special characters") 
{
-val serializer = serializerFor[SpecialCharAsFieldData](BoundReference(
-  0, ObjectType(classOf[SpecialCharAsFieldData]), nullable = false))
+val serializer = 
serializerForType(ScalaReflection.localTypeOf[SpecialCharAsFieldData])
--- End diff --

can we replace all the `serializerForType` with `serializerFor` in this 
suite?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_json

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22237
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder to get ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22749
  
LGTM except 2 minor comments


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227742062
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
 ---
@@ -351,11 +347,15 @@ class ScalaReflectionSuite extends SparkFunSuite {
 
   test("SPARK-23835: add null check to non-nullable types in Tuples") {
 def numberOfCheckedArguments(deserializer: Expression): Int = {
-  assert(deserializer.isInstanceOf[NewInstance])
-  
deserializer.asInstanceOf[NewInstance].arguments.count(_.isInstanceOf[AssertNotNull])
+  val newInstance = deserializer.collect { case n: NewInstance => 
n}.head
+  newInstance.arguments.count(_.isInstanceOf[AssertNotNull])
 }
-assert(numberOfCheckedArguments(deserializerFor[(Double, Double)]) == 
2)
-assert(numberOfCheckedArguments(deserializerFor[(java.lang.Double, 
Int)]) == 1)
-assert(numberOfCheckedArguments(deserializerFor[(java.lang.Integer, 
java.lang.Integer)]) == 0)
+assert(numberOfCheckedArguments(
+  deserializerForType(ScalaReflection.localTypeOf[(Double, Double)])) 
== 2)
--- End diff --

shall we create a `deserializerFor` method in this test suite to save some 
code diff?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227739775
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -212,21 +181,90 @@ object ExpressionEncoder {
  * A generic encoder for JVM objects that uses Catalyst Expressions for a 
`serializer`
  * and a `deserializer`.
  *
- * @param schema The schema after converting `T` to a Spark SQL row.
- * @param serializer A set of expressions, one for each top-level field 
that can be used to
- *   extract the values from a raw object into an 
[[InternalRow]].
- * @param deserializer An expression that will construct an object given 
an [[InternalRow]].
+ * @param objSerializer An expression that can be used to encode a raw 
object to corresponding
+ *   Spark SQL representation that can be a primitive 
column, array, map or a
+ *   struct. This represents how Spark SQL generally 
serializes an object of
+ *   type `T`.
+ * @param objDeserializer An expression that will construct an object 
given a Spark SQL
+ *representation. This represents how Spark SQL 
generally deserializes
+ *a serialized value in Spark SQL representation 
back to an object of
+ *type `T`.
  * @param clsTag A classtag for `T`.
  */
 case class ExpressionEncoder[T](
-schema: StructType,
-flat: Boolean,
-serializer: Seq[Expression],
-deserializer: Expression,
+objSerializer: Expression,
+objDeserializer: Expression,
 clsTag: ClassTag[T])
   extends Encoder[T] {
 
-  if (flat) require(serializer.size == 1)
+  /**
+   * A sequence of expressions, one for each top-level field that can be 
used to
+   * extract the values from a raw object into an [[InternalRow]]:
+   * 1. If `serializer` encodes a raw object to a struct, we directly use 
the `serializer`.
+   * 2. For other cases, we create a struct to wrap the `serializer`.
--- End diff --

Let's make these 2 comments more precise
```
1. If `serializer` encodes a raw object to a struct, strip the outer 
if-IsNull and get the CreateNamedStruct
2. For other cases, wrap the single serializer with CreateNamedStruct
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227682823
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 ---
@@ -58,12 +58,10 @@ object RowEncoder {
   def apply(schema: StructType): ExpressionEncoder[Row] = {
 val cls = classOf[Row]
 val inputObject = BoundReference(0, ObjectType(cls), nullable = true)
-val serializer = serializerFor(AssertNotNull(inputObject, Seq("top 
level row object")), schema)
-val deserializer = deserializerFor(schema)
+val serializer = serializerFor(inputObject, schema)
+val deserializer = deserializerFor(GetColumnByOrdinal(0, 
serializer.dataType), schema)
--- End diff --

ah i see, then let's leave it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227682672
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -434,17 +426,34 @@ object ScalaReflection extends ScalaReflection {
*  * the element type of [[Array]] or [[Seq]]: `array element class: 
"abc.xyz.MyClass"`
*  * the field of [[Product]]: `field (class: "abc.xyz.MyClass", name: 
"myField")`
*/
-  def serializerFor[T : TypeTag](inputObject: Expression): 
CreateNamedStruct = {
-val tpe = localTypeOf[T]
+  def serializerForType(tpe: `Type`,
+  cls: RuntimeClass): Expression = 
ScalaReflection.cleanUpReflectionObjects {
 val clsName = getClassNameFromType(tpe)
 val walkedTypePath = s"""- root class: "$clsName"""" :: Nil
-serializerFor(inputObject, tpe, walkedTypePath) match {
-  case expressions.If(_, _, s: CreateNamedStruct) if 
definedByConstructorParams(tpe) => s
-  case other => CreateNamedStruct(expressions.Literal("value") :: 
other :: Nil)
-}
+
+// The input object to `ExpressionEncoder` is located at first column 
of an row.
+val inputObject = BoundReference(0, dataTypeFor(tpe),
+  nullable = !cls.isPrimitive)
--- End diff --

good, then we don't need `cls` as a parameter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227681844
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -1087,7 +1087,7 @@ class Dataset[T] private[sql](
 // Note that we do this before joining them, to enable the join 
operator to return null for one
 // side, in cases like outer-join.
 val left = {
-  val combined = if (this.exprEnc.flat) {
+  val combined = if 
(!this.exprEnc.objSerializer.dataType.isInstanceOf[StructType]) {
--- End diff --

shall we create a method in `ExpressionEncoder` for this check?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227675880
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 ---
@@ -58,12 +58,10 @@ object RowEncoder {
   def apply(schema: StructType): ExpressionEncoder[Row] = {
 val cls = classOf[Row]
 val inputObject = BoundReference(0, ObjectType(cls), nullable = true)
-val serializer = serializerFor(AssertNotNull(inputObject, Seq("top 
level row object")), schema)
-val deserializer = deserializerFor(schema)
+val serializer = serializerFor(inputObject, schema)
+val deserializer = deserializerFor(GetColumnByOrdinal(0, 
serializer.dataType), schema)
--- End diff --

in  `ScalaReflection`, we create `GetColumnByOrdinal` in `deserializeFor`, 
shall we follow it here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227673675
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -434,17 +426,34 @@ object ScalaReflection extends ScalaReflection {
*  * the element type of [[Array]] or [[Seq]]: `array element class: 
"abc.xyz.MyClass"`
*  * the field of [[Product]]: `field (class: "abc.xyz.MyClass", name: 
"myField")`
*/
-  def serializerFor[T : TypeTag](inputObject: Expression): 
CreateNamedStruct = {
-val tpe = localTypeOf[T]
+  def serializerForType(tpe: `Type`,
+  cls: RuntimeClass): Expression = 
ScalaReflection.cleanUpReflectionObjects {
 val clsName = getClassNameFromType(tpe)
 val walkedTypePath = s"""- root class: "$clsName"""" :: Nil
-serializerFor(inputObject, tpe, walkedTypePath) match {
-  case expressions.If(_, _, s: CreateNamedStruct) if 
definedByConstructorParams(tpe) => s
-  case other => CreateNamedStruct(expressions.Literal("value") :: 
other :: Nil)
-}
+
+// The input object to `ExpressionEncoder` is located at first column 
of an row.
+val inputObject = BoundReference(0, dataTypeFor(tpe),
+  nullable = !cls.isPrimitive)
+
+serializerFor(inputObject, tpe, walkedTypePath)
   }
 
-  /** Helper for extracting internal fields from a case class. */
+  /**
+   * Returns an expression for serializing the value of an input 
expression into Spark SQL
--- End diff --

do we really need to duplicate the doc in this private method?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227672066
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -434,17 +426,34 @@ object ScalaReflection extends ScalaReflection {
*  * the element type of [[Array]] or [[Seq]]: `array element class: 
"abc.xyz.MyClass"`
*  * the field of [[Product]]: `field (class: "abc.xyz.MyClass", name: 
"myField")`
*/
-  def serializerFor[T : TypeTag](inputObject: Expression): 
CreateNamedStruct = {
-val tpe = localTypeOf[T]
+  def serializerForType(tpe: `Type`,
+  cls: RuntimeClass): Expression = 
ScalaReflection.cleanUpReflectionObjects {
 val clsName = getClassNameFromType(tpe)
 val walkedTypePath = s"""- root class: "$clsName"""" :: Nil
-serializerFor(inputObject, tpe, walkedTypePath) match {
-  case expressions.If(_, _, s: CreateNamedStruct) if 
definedByConstructorParams(tpe) => s
-  case other => CreateNamedStruct(expressions.Literal("value") :: 
other :: Nil)
-}
+
+// The input object to `ExpressionEncoder` is located at first column 
of an row.
+val inputObject = BoundReference(0, dataTypeFor(tpe),
+  nullable = !cls.isPrimitive)
--- End diff --

we just check isPrimitive of the given `cls`, can we check `tpe` directly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_json

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22237
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22812: [SPARK-25817][SQL] Dataset encoder should support combin...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22812
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder to get ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22749
  
hmm, it still has conflict...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21860: [SPARK-24901][SQL]Merge the codegen of RegularHas...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21860#discussion_r227655432
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -831,7 +832,14 @@ case class HashAggregateExec(
 ctx.currentVars = new 
Array[ExprCode](aggregateBufferAttributes.length) ++ input
 
 val updateRowInRegularHashMap: String = {
-  ctx.INPUT_ROW = unsafeRowBuffer
+  val updatedTmpAggBuffer =
+if (isFastHashMapEnabled && !isVectorizedHashMapEnabled) {
+  updatedAggBuffer
--- End diff --

This also simplifies the generated code. We don't need a if-else to assign 
value to this new variable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r227654755
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -45,6 +46,11 @@ case class CreateHiveTableAsSelectCommand(
 
   override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] 
= {
 val catalog = sparkSession.sessionState.catalog
+val metastoreCatalog = 
catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
+
+// Whether this table is convertible to data source relation.
+val isConvertible = metastoreCatalog.isConvertible(tableDesc)
--- End diff --

I feel `CreateHiveTableAsSelectCommand` is not useful. It simply creates 
the table first and then call `InsertIntoHiveTable.run`. Maybe we should just 
remove it and implement hive table CTAS as `Union(CreateTable, 
InsertIntoTable)`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r227654240
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -45,6 +46,11 @@ case class CreateHiveTableAsSelectCommand(
 
   override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] 
= {
 val catalog = sparkSession.sessionState.catalog
+val metastoreCatalog = 
catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
+
+// Whether this table is convertible to data source relation.
+val isConvertible = metastoreCatalog.isConvertible(tableDesc)
--- End diff --

another idea: can we move this logic to the `RelationConversions` rule? e.g.
```
case CreateTable(tbl, mode, Some(query)) if DDLUtils.isHiveTable(tbl) && 
isConvertible(tbl) =>
  Union(CreateTable(tbl, mode, None), InsertIntoTable ...)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r227653464
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala ---
@@ -92,4 +92,18 @@ class HiveParquetSuite extends QueryTest with 
ParquetTest with TestHiveSingleton
   }
 }
   }
+
+  test("SPARK-25271: write empty map into hive parquet table") {
+val testData = 
hiveContext.getHiveFile("data/files/empty_map.dat").getCanonicalFile()
+val sourceTable = "sourceTable"
+val targetTable = "targetTable"
+withTable(sourceTable, targetTable) {
+  sql(s"CREATE TABLE $sourceTable (i int,m map) ROW 
FORMAT DELIMITED FIELDS " +
+"TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY ':' MAP KEYS 
TERMINATED BY '$'")
+  sql(s"LOAD DATA LOCAL INPATH '${testData.toURI}' INTO TABLE 
$sourceTable")
--- End diff --

can we generate the input data with a temp view? e.g. create a dataframe 
with literals and register temp view.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21860: [SPARK-24901][SQL]Merge the codegen of RegularHas...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21860#discussion_r227650326
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -831,7 +832,14 @@ case class HashAggregateExec(
 ctx.currentVars = new 
Array[ExprCode](aggregateBufferAttributes.length) ++ input
 
 val updateRowInRegularHashMap: String = {
-  ctx.INPUT_ROW = unsafeRowBuffer
+  val updatedTmpAggBuffer =
+if (isFastHashMapEnabled && !isVectorizedHashMapEnabled) {
+  updatedAggBuffer
--- End diff --

just realized it. Do we create the `updatedAggBuffer` variable only to 
improve the readability of the generated code? It looks to me we don't need 
this variable. Here we can write
```
ctx.INPUT_ROW = if (isFastHashMapEnabled && !isVectorizedHashMapEnabled) 
fastRowBuffer else unsafeRowBuffer
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22144
  
Unfortunately, we didn't drop it mistakenly. It's a mistake and we should 
fix it. What I try to avoid is adding back the `supportsPartial` flag. We 
should look into the root cause and see how to fix it better.

I don't know if this policy is written down officially, but I do remember 
we followed this policy many times in the previous releases. Please correct me 
if I am wrong.

I'll list it as a known issue in 2.4.0 release notes. It will be great if 
someone can investigate the root cause and propose a fix(with a test).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_json

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22237
  
LGTM, pending jenkins.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22812: [SPARK-25817][SQL] Dataset encoder should support combin...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22812
  
cc @michalsenkyr @vofque @viirya 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22812: [SPARK-25817][SQL] Dataset encoder should support...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22812#discussion_r227638941
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1837,8 +1837,6 @@ case class GetArrayFromMap private(
 arrayGetter: MapData => ArrayData,
 elementTypeGetter: MapType => DataType) extends UnaryExpression with 
NonSQLExpression {
 
-  private lazy val encodedFunctionName: String = 
TermName(functionName).encodedName.toString
--- End diff --

this is to address 
https://github.com/apache/spark/pull/22745#discussion_r227407344


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22812: [SPARK-25817][SQL] Dataset encoder should support...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22812#discussion_r227638902
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1090,15 +1096,9 @@ case class CatalystToExternalMap private(
 val tupleLoopValue = ctx.freshName("tupleLoopValue")
 val builderValue = ctx.freshName("builderValue")
 
-val getLength = s"${genInputData.value}.numElements()"
--- End diff --

these are unrelated, but is a followup of 
https://github.com/apache/spark/pull/16986 to address the remaining comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22812: [SPARK-25817][SQL] Dataset encoder should support...

2018-10-23 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/22812

[SPARK-25817][SQL] Dataset encoder should support combination of map and 
product type

## What changes were proposed in this pull request?

After https://github.com/apache/spark/pull/22745 , Dataset encoder supports 
the combination of java bean and map type. This PR is to fix the Scala side.

The reason why it didn't work before is, `CatalystToExternalMap` tries to 
get the data type of the input map expression, while it can be unresolved and 
its data type is known. To fix it, we can follow `UnresolvedMapObjects`, to 
create a `UnresolvedCatalystToExternalMap`, and only create 
`CatalystToExternalMap` when the input map expression is resolved and the data 
type is known.

## How was this patch tested?

enable a old test case

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark map

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22812.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22812


commit da31d2602b8e12eb8949336cf14b903c0df731cf
Author: Wenchen Fan 
Date:   2018-10-24T04:21:44Z

Dataset encoder should support combination of map and product type




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22514: [SPARK-25271][SQL] Hive ctas commands should use data so...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22514
  
sounds like a clean solution. please go ahead, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227618778
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MutableProjectionSuite.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+class MutableProjectionSuite extends SparkFunSuite with 
ExpressionEvalHelper {
+
+  private def createMutableProjection(dataTypes: Array[DataType]): 
MutableProjection = {
+MutableProjection.create(dataTypes.zipWithIndex.map(x => 
BoundReference(x._2, x._1, true)))
+  }
+
+  testBothCodegenAndInterpreted("fixed-length types") {
+val fixedLengthTypes = Array[DataType](
+  BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, 
DoubleType,
+  DateType, TimestampType)
+val proj = createMutableProjection(fixedLengthTypes)
+val inputRow = InternalRow.fromSeq(
+  Seq(false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 5.0, 100, 200L))
+assert(proj(inputRow) === inputRow)
+
+// Use UnsafeRow as buffer
+val numBytes = 
UnsafeRow.calculateBitSetWidthInBytes(fixedLengthTypes.length)
+val unsafeBuffer = UnsafeRow.createFromByteArray(numBytes, 
fixedLengthTypes.length)
+val projUnsafeRow = proj.target(unsafeBuffer)(inputRow)
+assert(FromUnsafeProjection(fixedLengthTypes)(projUnsafeRow) === 
inputRow)
+  }
+
+  testBothCodegenAndInterpreted("variable-length types") {
+val variableLengthTypes = Array(
+  StringType, DecimalType.defaultConcreteType, CalendarIntervalType, 
BinaryType,
+  ArrayType(StringType), MapType(IntegerType, StringType),
+  StructType.fromDDL("a INT, b STRING"), 
ObjectType(classOf[java.lang.Integer]))
+val proj = createMutableProjection(variableLengthTypes)
--- End diff --

shall we also test that we should fail if the target row is unsafe row?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227618313
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala ---
@@ -143,4 +144,25 @@ object InternalRow {
 case u: UserDefinedType[_] => getAccessor(u.sqlType)
 case _ => (input, ordinal) => input.get(ordinal, dataType)
   }
+
+  /**
+   * Returns a writer for an `InternalRow` with given data type.
+   */
+  def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = 
dt match {
+case BooleanType => (input, v) => input.setBoolean(ordinal, 
v.asInstanceOf[Boolean])
+case ByteType => (input, v) => input.setByte(ordinal, 
v.asInstanceOf[Byte])
+case ShortType => (input, v) => input.setShort(ordinal, 
v.asInstanceOf[Short])
+case IntegerType | DateType => (input, v) => input.setInt(ordinal, 
v.asInstanceOf[Int])
+case LongType | TimestampType => (input, v) => input.setLong(ordinal, 
v.asInstanceOf[Long])
+case FloatType => (input, v) => input.setFloat(ordinal, 
v.asInstanceOf[Float])
+case DoubleType => (input, v) => input.setDouble(ordinal, 
v.asInstanceOf[Double])
+case DecimalType.Fixed(precision, _) =>
+  (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], 
precision)
+case CalendarIntervalType | BinaryType | _: ArrayType | StringType | 
_: StructType |
+ _: MapType | _: ObjectType =>
+  (input, v) => input.update(ordinal, v)
+case udt: UserDefinedType[_] => getWriter(ordinal, udt.sqlType)
+case NullType => (input, _) => input.setNullAt(ordinal)
+case _ => throw new SparkException(s"Unsupported data type $dt")
--- End diff --

one minor point: the codegen version just call `row.update` for un-caught 
types, which means it supports object type as well. Shall we follow?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22745: [SPARK-25772][SQL] Fix java map of structs deserializati...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22745
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22745#discussion_r227611044
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1787,3 +1788,78 @@ case class ValidateExternalType(child: Expression, 
expected: DataType)
 ev.copy(code = code, isNull = input.isNull)
   }
 }
+
+object GetKeyArrayFromMap {
+
+  /**
+   * Construct an instance of GetArrayFromMap case class
+   * extracting a key array from a Map expression.
+   *
+   * @param child a Map expression to extract a key array from
+   */
+  def apply(child: Expression): Expression = {
+GetArrayFromMap(
+  child,
+  "keyArray",
+  _.keyArray(),
+  { case MapType(kt, _, _) => kt })
+  }
+}
+
+object GetValueArrayFromMap {
+
+  /**
+   * Construct an instance of GetArrayFromMap case class
+   * extracting a value array from a Map expression.
+   *
+   * @param child a Map expression to extract a value array from
+   */
+  def apply(child: Expression): Expression = {
+GetArrayFromMap(
+  child,
+  "valueArray",
+  _.valueArray(),
+  { case MapType(_, vt, _) => vt })
+  }
+}
+
+/**
+ * Extracts a key/value array from a Map expression.
+ *
+ * @param child a Map expression to extract an array from
+ * @param functionName name of the function that is invoked to extract an 
array
+ * @param arrayGetter function extracting `ArrayData` from `MapData`
+ * @param elementTypeGetter function extracting array element `DataType` 
from `MapType`
+ */
+case class GetArrayFromMap private(
+child: Expression,
+functionName: String,
+arrayGetter: MapData => ArrayData,
+elementTypeGetter: MapType => DataType) extends UnaryExpression with 
NonSQLExpression {
+
+  private lazy val encodedFunctionName: String = 
TermName(functionName).encodedName.toString
--- End diff --

We forgot to remove it. It's not a big issue and we don't need to waste one 
more QA round for it. I'll fix it in a followup PR that apply this approach to 
`ScalaReflection`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22144
  
After all, this is a bug and a regression from previous releases, like 
other 1000 we've fixed before. According to the policy, we don't have to block 
the current release because of it, and this bug
1. is a hive compatibility bug. Spark fails to run some Hive UDAFs
2. It fails the job instead of returning wrong result
3. the root cause is unknown

Thus, I think it's a non-blocker.

I looked into it more and I'm 90% sure this is caused by 
https://issues.apache.org/jira/browse/SPARK-18186 . We should spend more time 
on understanding how hive execute UDAF and fix the way we adapt it to Spark 
aggregate function.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22788: [SPARK-25769][SQL]escape nested columns by backtick each...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22788
  
I agree with the problem described in the PR description that 
`UnresolvedAttribute.sql` is not ideal. But we should just update 
`UnresolvedAttribute.sql`, not the `name` method. `name` is used in other 
places and I think it has no problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22514: [SPARK-25271][SQL] Hive ctas commands should use data so...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22514
  
@viirya can you explain the high-level idea about how to fix it? It seems 
hard to fix and we should get a consensus on the approach first.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r227602783
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala ---
@@ -34,11 +34,16 @@ import org.apache.spark.sql.types._
  * @param tableDesc the metadata of the table to be created.
  * @param mode the data writing mode
  * @param query an optional logical plan representing data to write into 
the created table.
+ * @param useExternalSerde whether to use external serde to write data, 
e.g., Hive Serde. Currently
--- End diff --

I don't have a clear idea now, but `CreateTable` is a general logical plan 
for CREATE TABLE, we may even public in to data source/catalog APIs in the 
future, we should not put hive specific concept here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22514: [SPARK-25271][SQL] Hive ctas commands should use data so...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22514
  
Yes this is a performance regression for users who run CTAS on Hive serde 
tables. This is a regression since Spark 2.3.0


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22144
  
This is not a PR that is ready to merge. We are likely talking about 
delaying 2.4.0 for multiple weeks because of this issue. Is it really worth?

I'm not sure what's the exact policy, let's ping more people. @rxin  
@srowen @vanzin @felixcheung @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227433044
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala ---
@@ -143,4 +144,24 @@ object InternalRow {
 case u: UserDefinedType[_] => getAccessor(u.sqlType)
 case _ => (input, ordinal) => input.get(ordinal, dataType)
   }
+
+  /**
+   * Returns a writer for an `InternalRow` with given data type.
+   */
+  def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = 
dt match {
+case BooleanType => (input, v) => input.setBoolean(ordinal, 
v.asInstanceOf[Boolean])
+case ByteType => (input, v) => input.setByte(ordinal, 
v.asInstanceOf[Byte])
+case ShortType => (input, v) => input.setShort(ordinal, 
v.asInstanceOf[Short])
+case IntegerType | DateType => (input, v) => input.setInt(ordinal, 
v.asInstanceOf[Int])
+case LongType | TimestampType => (input, v) => input.setLong(ordinal, 
v.asInstanceOf[Long])
+case FloatType => (input, v) => input.setFloat(ordinal, 
v.asInstanceOf[Float])
+case DoubleType => (input, v) => input.setDouble(ordinal, 
v.asInstanceOf[Double])
+case DecimalType.Fixed(precision, _) =>
+  (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], 
precision)
+case CalendarIntervalType | BinaryType | _: ArrayType | StringType | 
_: StructType |
+ _: MapType | _: ObjectType | _: UserDefinedType[_] =>
--- End diff --

we should recursive into UDT.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227432603
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala ---
@@ -143,4 +144,24 @@ object InternalRow {
 case u: UserDefinedType[_] => getAccessor(u.sqlType)
 case _ => (input, ordinal) => input.get(ordinal, dataType)
   }
+
+  /**
+   * Returns a writer for an `InternalRow` with given data type.
+   */
+  def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = 
dt match {
+case BooleanType => (input, v) => input.setBoolean(ordinal, 
v.asInstanceOf[Boolean])
+case ByteType => (input, v) => input.setByte(ordinal, 
v.asInstanceOf[Byte])
+case ShortType => (input, v) => input.setShort(ordinal, 
v.asInstanceOf[Short])
+case IntegerType | DateType => (input, v) => input.setInt(ordinal, 
v.asInstanceOf[Int])
+case LongType | TimestampType => (input, v) => input.setLong(ordinal, 
v.asInstanceOf[Long])
+case FloatType => (input, v) => input.setFloat(ordinal, 
v.asInstanceOf[Float])
+case DoubleType => (input, v) => input.setDouble(ordinal, 
v.asInstanceOf[Double])
+case DecimalType.Fixed(precision, _) =>
+  (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], 
precision)
+case CalendarIntervalType | BinaryType | _: ArrayType | StringType | 
_: StructType |
+ _: MapType | _: ObjectType | _: UserDefinedType[_] =>
+  (input, v) => input.update(ordinal, v)
+case NullType => (input, v) => {}
--- End diff --

In the codegen version `CodeGenerator.setColumn`, we don't match NullType 
and eventually call `row.update(null, i)`. Shall we follow and call 
`row.setNullAt` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22745#discussion_r227431215
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1787,3 +1788,79 @@ case class ValidateExternalType(child: Expression, 
expected: DataType)
 ev.copy(code = code, isNull = input.isNull)
   }
 }
+
+object GetKeyArrayFromMap {
+
+  /**
+   * Construct an instance of GetArrayFromMap case class
+   * extracting a key array from a Map expression.
+   *
+   * @param child a Map expression to extract a key array from
+   */
+  def apply(child: Expression): Expression = {
+GetArrayFromMap(
+  child,
+  "keyArray",
+  _.keyArray(),
+  { case MapType(kt, _, _) => kt })
+  }
+}
+
+object GetValueArrayFromMap {
+
+  /**
+   * Construct an instance of GetArrayFromMap case class
+   * extracting a value array from a Map expression.
+   *
+   * @param child a Map expression to extract a value array from
+   */
+  def apply(child: Expression): Expression = {
+GetArrayFromMap(
+  child,
+  "valueArray",
+  _.valueArray(),
+  { case MapType(_, vt, _) => vt })
+  }
+}
+
+/**
+ * Extracts a key/value array from a Map expression.
+ *
+ * @param child a Map expression to extract an array from
+ * @param functionName name of the function that is invoked to extract an 
array
+ * @param arrayGetter function extracting `ArrayData` from `MapData`
+ * @param elementTypeGetter function extracting array element `DataType` 
from `MapType`
+ */
+case class GetArrayFromMap private(
+child: Expression,
+functionName: String,
+arrayGetter: MapData => ArrayData,
+elementTypeGetter: MapType => DataType) extends UnaryExpression with 
NonSQLExpression {
+
+  private lazy val encodedFunctionName: String = 
TermName(functionName).encodedName.toString
+
+  lazy val dataType: DataType = {
+val mt: MapType = child.dataType.asInstanceOf[MapType]
+ArrayType(elementTypeGetter(mt))
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+child.dataType match {
+  case MapType(_, _, _) =>
--- End diff --

shall we just use if-else?
```
if (isinstanceOf[MapType]) ... else ...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22745#discussion_r227430882
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1787,3 +1788,79 @@ case class ValidateExternalType(child: Expression, 
expected: DataType)
 ev.copy(code = code, isNull = input.isNull)
   }
 }
+
+object GetKeyArrayFromMap {
+
+  /**
+   * Construct an instance of GetArrayFromMap case class
+   * extracting a key array from a Map expression.
+   *
+   * @param child a Map expression to extract a key array from
+   */
+  def apply(child: Expression): Expression = {
+GetArrayFromMap(
+  child,
+  "keyArray",
+  _.keyArray(),
+  { case MapType(kt, _, _) => kt })
+  }
+}
+
+object GetValueArrayFromMap {
+
+  /**
+   * Construct an instance of GetArrayFromMap case class
+   * extracting a value array from a Map expression.
+   *
+   * @param child a Map expression to extract a value array from
+   */
+  def apply(child: Expression): Expression = {
+GetArrayFromMap(
+  child,
+  "valueArray",
+  _.valueArray(),
+  { case MapType(_, vt, _) => vt })
+  }
+}
+
+/**
+ * Extracts a key/value array from a Map expression.
+ *
+ * @param child a Map expression to extract an array from
+ * @param functionName name of the function that is invoked to extract an 
array
+ * @param arrayGetter function extracting `ArrayData` from `MapData`
+ * @param elementTypeGetter function extracting array element `DataType` 
from `MapType`
+ */
+case class GetArrayFromMap private(
+child: Expression,
+functionName: String,
+arrayGetter: MapData => ArrayData,
+elementTypeGetter: MapType => DataType) extends UnaryExpression with 
NonSQLExpression {
+
+  private lazy val encodedFunctionName: String = 
TermName(functionName).encodedName.toString
+
+  lazy val dataType: DataType = {
+val mt: MapType = child.dataType.asInstanceOf[MapType]
+ArrayType(elementTypeGetter(mt))
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+child.dataType match {
+  case MapType(_, _, _) =>
--- End diff --

`case _: MapType`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22745#discussion_r227407344
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1787,3 +1787,72 @@ case class ValidateExternalType(child: Expression, 
expected: DataType)
 ev.copy(code = code, isNull = input.isNull)
   }
 }
+
+object GetKeyArrayFromMap {
+
+  /**
+   * Construct an instance of GetArrayFromMap case class
+   * extracting a key array from a Map expression.
+   *
+   * @param child a Map expression to extract key array from
+   */
+  def apply(child: Expression): Expression = {
+GetArrayFromMap(
+  child,
+  "keyArray",
+  { case MapType(kt, _, _) => kt },
+  _.keyArray())
+  }
+}
+
+object GetValueArrayFromMap {
+
+  /**
+   * Construct an instance of GetArrayFromMap case class
+   * extracting a value array from a Map expression.
+   *
+   * @param child a Map expression to extract value array from
+   */
+  def apply(child: Expression): Expression = {
+GetArrayFromMap(
+  child,
+  "valueArray",
+  { case MapType(_, vt, _) => vt },
+  _.valueArray())
+  }
+}
+
+/**
+ * Extracts a key/value array from a Map expression.
+ *
+ * @param child a Map expression to extract array from
+ * @param source source of array elements, can be `Key` or `Value`
+ */
+case class GetArrayFromMap private(
+child: Expression,
+functionName: String,
+elementTypeGetter: MapType => DataType,
+arrayGetter: MapData => ArrayData) extends UnaryExpression with 
NonSQLExpression {
+
+  private lazy val encodedFunctionName: String = 
TermName(functionName).encodedName.toString
--- End diff --

Then let's save it. Otherwise other reviewers may get confused as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22785: [SPARK-25791][SQL] Datatype of serializers in RowEncoder...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22785
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22144
  
Note that the `supportsPartial` flag was dropped at Spark 2.2, not 2.4.

I'm not very familiar with Hive code so I don't clearly know how it is 
broken. The worst case is, Hive has some UDAF that don't support partial 
aggregate, and Spark needs to adjust its aggregate framework. Or it's just we 
incorrectly adapt Hive UDAF to Spark aggregation function, and we can simply 
work around it.

I shouldn't state is as a feature, it's an ability of Spark's aggregate 
framework to stop partial aggregate for some functions.

This fix is not ready. We should at least update the doc of 
`HiveUDAFFunction`, so that we can know where we misunderstand Hive UDAF 
framework.

If we were at Spark 2.2, we should definitely revert the PRs that caused 
this issue. But it's 2.4 now, reverting very old commits is not safe.

Personally I don't think this is a blocker that we have to fix it before 
releasing. It's not a correctness issue. it doesn't impact a lot of users, and 
it's there for nearly 2 years.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22745: [SPARK-25772][SQL] Fix java map of structs deserializati...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22745
  
LGTM except 2 minor comments


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22745#discussion_r227391572
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1787,3 +1787,72 @@ case class ValidateExternalType(child: Expression, 
expected: DataType)
 ev.copy(code = code, isNull = input.isNull)
   }
 }
+
+object GetKeyArrayFromMap {
+
+  /**
+   * Construct an instance of GetArrayFromMap case class
+   * extracting a key array from a Map expression.
+   *
+   * @param child a Map expression to extract key array from
+   */
+  def apply(child: Expression): Expression = {
+GetArrayFromMap(
+  child,
+  "keyArray",
+  { case MapType(kt, _, _) => kt },
+  _.keyArray())
+  }
+}
+
+object GetValueArrayFromMap {
+
+  /**
+   * Construct an instance of GetArrayFromMap case class
+   * extracting a value array from a Map expression.
+   *
+   * @param child a Map expression to extract value array from
+   */
+  def apply(child: Expression): Expression = {
+GetArrayFromMap(
+  child,
+  "valueArray",
+  { case MapType(_, vt, _) => vt },
+  _.valueArray())
+  }
+}
+
+/**
+ * Extracts a key/value array from a Map expression.
+ *
+ * @param child a Map expression to extract array from
+ * @param source source of array elements, can be `Key` or `Value`
+ */
+case class GetArrayFromMap private(
+child: Expression,
+functionName: String,
+elementTypeGetter: MapType => DataType,
+arrayGetter: MapData => ArrayData) extends UnaryExpression with 
NonSQLExpression {
+
+  private lazy val encodedFunctionName: String = 
TermName(functionName).encodedName.toString
--- End diff --

what is this doing?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22745#discussion_r227391434
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1787,3 +1787,72 @@ case class ValidateExternalType(child: Expression, 
expected: DataType)
 ev.copy(code = code, isNull = input.isNull)
   }
 }
+
+object GetKeyArrayFromMap {
+
+  /**
+   * Construct an instance of GetArrayFromMap case class
+   * extracting a key array from a Map expression.
+   *
+   * @param child a Map expression to extract key array from
+   */
+  def apply(child: Expression): Expression = {
+GetArrayFromMap(
+  child,
+  "keyArray",
+  { case MapType(kt, _, _) => kt },
+  _.keyArray())
+  }
+}
+
+object GetValueArrayFromMap {
+
+  /**
+   * Construct an instance of GetArrayFromMap case class
+   * extracting a value array from a Map expression.
+   *
+   * @param child a Map expression to extract value array from
+   */
+  def apply(child: Expression): Expression = {
+GetArrayFromMap(
+  child,
+  "valueArray",
+  { case MapType(_, vt, _) => vt },
+  _.valueArray())
+  }
+}
+
+/**
+ * Extracts a key/value array from a Map expression.
+ *
+ * @param child a Map expression to extract array from
+ * @param source source of array elements, can be `Key` or `Value`
+ */
+case class GetArrayFromMap private(
+child: Expression,
+functionName: String,
+elementTypeGetter: MapType => DataType,
+arrayGetter: MapData => ArrayData) extends UnaryExpression with 
NonSQLExpression {
+
+  private lazy val encodedFunctionName: String = 
TermName(functionName).encodedName.toString
+
+  lazy val dataType: DataType = {
+child.dataType match {
+  case mt @ MapType(_, _, _) =>
+ArrayType(elementTypeGetter(mt))
+  case other =>
--- End diff --

let's do this check in 
```
override def checkInputDataTypes
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_json

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22237
  
@HyukjinKwon are you working on it? @gengliangwang do you want to take over?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22745#discussion_r227357201
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1787,3 +1787,75 @@ case class ValidateExternalType(child: Expression, 
expected: DataType)
 ev.copy(code = code, isNull = input.isNull)
   }
 }
+
+object GetArrayFromMap {
+  abstract class Source
+  case class Key() extends Source
+  case class Value() extends Source
+}
+
+/**
+ * Extracts a key/value array from a Map expression.
+ *
+ * @param child a Map expression to extract array from
+ * @param source source of array elements, can be `Key` or `Value`
+ */
+case class GetArrayFromMap(
+child: Expression,
+source: GetArrayFromMap.Source) extends Expression with 
NonSQLExpression {
+
+  import GetArrayFromMap._
+
+  private val functionName: String = source match {
+case Key() => "keyArray"
+case Value() => "valueArray"
+  }
+
+  private lazy val encodedFunctionName = 
TermName(functionName).encodedName.toString
+
+  override def nullable: Boolean = child.nullable
+  override def children: Seq[Expression] = child :: Nil
+
+  lazy val dataType: DataType = {
+child.dataType match {
+  case MapType(kt, vt, _) =>
+source match {
+  case Key() => ArrayType(kt)
+  case Value() => ArrayType(vt)
+}
+  case other =>
+throw new RuntimeException(
+  s"Can't extract array from $child: need map type but got 
${other.catalogString}")
+}
+  }
+
+  override def eval(input: InternalRow): Any = {
+val mapObj = child.eval(input)
+if (!mapObj.isInstanceOf[MapData]) {
+  throw new RuntimeException(
+s"Can't extract array from $child: need map type but got 
${mapObj.getClass}")
+}
+if (mapObj == null) {
+  null
+} else {
+  val method = mapObj.getClass.getDeclaredMethod(functionName)
+  method.invoke(mapObj)
+}
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
--- End diff --

make this expression extends `UnaryExpression`, then we can write
```
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode =  
defineCodeGen(ctx, ev, map => s"map.$functionName")
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22745#discussion_r227356448
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1787,3 +1787,75 @@ case class ValidateExternalType(child: Expression, 
expected: DataType)
 ev.copy(code = code, isNull = input.isNull)
   }
 }
+
+object GetArrayFromMap {
+  abstract class Source
+  case class Key() extends Source
+  case class Value() extends Source
+}
+
+/**
+ * Extracts a key/value array from a Map expression.
+ *
+ * @param child a Map expression to extract array from
+ * @param source source of array elements, can be `Key` or `Value`
+ */
+case class GetArrayFromMap(
+child: Expression,
+source: GetArrayFromMap.Source) extends Expression with 
NonSQLExpression {
+
+  import GetArrayFromMap._
+
+  private val functionName: String = source match {
+case Key() => "keyArray"
+case Value() => "valueArray"
+  }
+
+  private lazy val encodedFunctionName = 
TermName(functionName).encodedName.toString
+
+  override def nullable: Boolean = child.nullable
+  override def children: Seq[Expression] = child :: Nil
+
+  lazy val dataType: DataType = {
+child.dataType match {
+  case MapType(kt, vt, _) =>
+source match {
+  case Key() => ArrayType(kt)
+  case Value() => ArrayType(vt)
+}
+  case other =>
+throw new RuntimeException(
+  s"Can't extract array from $child: need map type but got 
${other.catalogString}")
+}
+  }
+
+  override def eval(input: InternalRow): Any = {
+val mapObj = child.eval(input)
+if (!mapObj.isInstanceOf[MapData]) {
+  throw new RuntimeException(
+s"Can't extract array from $child: need map type but got 
${mapObj.getClass}")
+}
+if (mapObj == null) {
+  null
+} else {
+  val method = mapObj.getClass.getDeclaredMethod(functionName)
--- End diff --

well this does reuse the `functionName`, but performance is more important 
here. how about
```
private lazy val arrayGetter: MapData => ArrayData = if (source) ...

def eval...
  arrayGetter( input.asInstanceOf[MapData])
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22745#discussion_r227355778
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1787,3 +1787,75 @@ case class ValidateExternalType(child: Expression, 
expected: DataType)
 ev.copy(code = code, isNull = input.isNull)
   }
 }
+
+object GetArrayFromMap {
+  abstract class Source
+  case class Key() extends Source
+  case class Value() extends Source
+}
+
+/**
+ * Extracts a key/value array from a Map expression.
+ *
+ * @param child a Map expression to extract array from
+ * @param source source of array elements, can be `Key` or `Value`
+ */
+case class GetArrayFromMap(
+child: Expression,
+source: GetArrayFromMap.Source) extends Expression with 
NonSQLExpression {
+
+  import GetArrayFromMap._
+
+  private val functionName: String = source match {
+case Key() => "keyArray"
+case Value() => "valueArray"
+  }
+
+  private lazy val encodedFunctionName = 
TermName(functionName).encodedName.toString
+
+  override def nullable: Boolean = child.nullable
+  override def children: Seq[Expression] = child :: Nil
+
+  lazy val dataType: DataType = {
+child.dataType match {
+  case MapType(kt, vt, _) =>
+source match {
+  case Key() => ArrayType(kt)
+  case Value() => ArrayType(vt)
+}
+  case other =>
+throw new RuntimeException(
+  s"Can't extract array from $child: need map type but got 
${other.catalogString}")
+}
+  }
+
+  override def eval(input: InternalRow): Any = {
+val mapObj = child.eval(input)
+if (!mapObj.isInstanceOf[MapData]) {
--- End diff --

we don't need this check. `eval` is performance critical and we should 
assume there is no bug. We don't have this check in other expressions either.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22745#discussion_r227355335
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1787,3 +1787,75 @@ case class ValidateExternalType(child: Expression, 
expected: DataType)
 ev.copy(code = code, isNull = input.isNull)
   }
 }
+
+object GetArrayFromMap {
+  abstract class Source
+  case class Key() extends Source
+  case class Value() extends Source
--- End diff --

I don't have a strong opinion, but shall we use enum?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22144
  
My feeling is that, hive compatibility is not that important to Spark at 
this point. *ALL* aggregate functions in Spark (including Spark UDAF) support 
partial aggregate, but now we need to complicate the aggregation framework and 
support un-partial-able aggregate functions, only for a few Hive UDAFs.

Unless there is a simple workaround, or we can justify that Spark needs 
un-partial-able aggregate functions, IMO it's not worth to introduce this 
feature.

BTW this PR doesn't even have a test, so I'm not sure if we can have a 
simple workaround for it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22778: [SPARK-25784][SQL] Infer filters from constraints...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22778#discussion_r227261253
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -171,9 +171,11 @@ abstract class Optimizer(sessionCatalog: 
SessionCatalog)
 // "Extract PythonUDF From JoinCondition".
 Batch("Check Cartesian Products", Once,
   CheckCartesianProducts) :+
-Batch("RewriteSubquery", Once,
+Batch("Rewrite Subquery", Once,
   RewritePredicateSubquery,
   ColumnPruning,
+  InferFiltersFromConstraints,
+  PushDownPredicate,
--- End diff --

looks good, cc @gatorsmile @maryannxue 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22788: [SPARK-25769][SQL]escape nested columns by backtick each...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22788
  
yea, only use json if it's a nested column.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21156#discussion_r227253732
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinUtils.scala ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import 
org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, HashClusteredDistribution, HashPartitioning}
+import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+
+object JoinUtils {
+  private def avoidShuffleIfPossible(
+  joinKeys: Seq[Expression],
+  expressions: Seq[Expression],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression]): Seq[Distribution] = {
+val indices = expressions.map(x => 
joinKeys.indexWhere(_.semanticEquals(x)))
+HashClusteredDistribution(indices.map(leftKeys(_))) ::
+  HashClusteredDistribution(indices.map(rightKeys(_))) :: Nil
+  }
+
+  def requiredChildDistributionForShuffledJoin(
+  partitioningDetection: Boolean,
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  left: SparkPlan,
+  right: SparkPlan): Seq[Distribution] = {
+if (!partitioningDetection) {
+  return HashClusteredDistribution(leftKeys) :: 
HashClusteredDistribution(rightKeys) :: Nil
+}
+
+val leftPartitioning = left.outputPartitioning
+val rightPartitioning = right.outputPartitioning
--- End diff --

This is my biggest concern. Currently Spark adds shuffle with a rule, so we 
can't always get the children partitioning precisely. We implemented a similar 
feature in `EnsureRequirements.reorderJoinPredicates`, which is hacky and we 
should improve the framework before adding more features like this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22788: [SPARK-25769][SQL]escape nested columns by backtick each...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22788
  
Yea I think so, we can even use JSON to be safer. e.g. for `a.b.c.d`, we 
can encode it as a json array [a,b,c,d]. At data source side, use a json parser 
to read it back.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22797: [SPARK-19851][SQL] Add support for EVERY and ANY (SOME) ...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22797
  
yea, just a special rule instead of a general agg function rewrite 
framework.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22775: [SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22775#discussion_r227243187
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -770,8 +776,17 @@ case class SchemaOfJson(
 factory
   }
 
-  override def convert(v: UTF8String): UTF8String = {
-val dt = 
Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, v)) { parser 
=>
+  @transient
+  private lazy val json = child.eval().asInstanceOf[UTF8String]
--- End diff --

so `schem_of_json` must be used with `from_json`? I don't have a strong 
feeling to enforce it. It's also weird that users are willing to write verbose 
json literal in `from_json(..., schema = schema_of_json(...))` instead of DDL 
string.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22799: [SPARK-25805][SQL][TEST] Fix test for SPARK-25159

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22799
  
thanks, merging to master/2.4!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r22723
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -49,10 +51,54 @@ class InterpretedMutableProjection(expressions: 
Seq[Expression]) extends Mutable
   def currentValue: InternalRow = mutableRow
 
   override def target(row: InternalRow): MutableProjection = {
+// If `mutableRow` is `UnsafeRow`, `MutableProjection` accepts 
fixed-length types only
+assert(!row.isInstanceOf[UnsafeRow] ||
+  validExprs.forall { case (e, _) => 
UnsafeRow.isFixedLength(e.dataType) })
 mutableRow = row
 this
   }
 
+  private[this] val fieldWriters = validExprs.map { case (e, i) =>
+val writer = generateRowWriter(i, e.dataType)
+if (!e.nullable) {
+  (v: Any) => writer(v)
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(v)
+}
+  }
+}
+  }
+
+  private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = 
dt match {
+case BooleanType =>
+  v => mutableRow.setBoolean(ordinal, v.asInstanceOf[Boolean])
+case ByteType =>
+  v => mutableRow.setByte(ordinal, v.asInstanceOf[Byte])
+case ShortType =>
+  v => mutableRow.setShort(ordinal, v.asInstanceOf[Short])
+case IntegerType | DateType =>
+  v => mutableRow.setInt(ordinal, v.asInstanceOf[Int])
+case LongType | TimestampType =>
+  v => mutableRow.setLong(ordinal, v.asInstanceOf[Long])
+case FloatType =>
+  v => mutableRow.setFloat(ordinal, v.asInstanceOf[Float])
+case DoubleType =>
+  v => mutableRow.setDouble(ordinal, v.asInstanceOf[Double])
+case DecimalType.Fixed(precision, _) =>
+  v => mutableRow.setDecimal(ordinal, v.asInstanceOf[Decimal], 
precision)
+case CalendarIntervalType | BinaryType | _: ArrayType | StringType | 
_: StructType |
+ _: MapType | _: UserDefinedType[_] =>
+  v => mutableRow.update(ordinal, v)
+case NullType =>
+  v => {}
--- End diff --

the corresponding logic in the codegen version is simply call 
`row.update(null, i)`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    3   4   5   6   7   8   9   10   11   12   >