[GitHub] spark pull request #22775: [SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22775#discussion_r227237973
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -770,8 +776,17 @@ case class SchemaOfJson(
 factory
   }
 
-  override def convert(v: UTF8String): UTF8String = {
-val dt = 
Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, v)) { parser 
=>
+  @transient
+  private lazy val json = child.eval().asInstanceOf[UTF8String]
--- End diff --

is it possible that users want to run `schema_of_json` on a string column 
of table?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22797: [SPARK-19851][SQL] Add support for EVERY and ANY (SOME) ...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22797
  
Hi @dilipbiswal  sorry for the back and forth, can you try one more 
approach? Basically we want to evaluate how the simplest logical rewrite looks 
like.

We can create unevaluatable EVERY and ANY expressions, and create a rule to 
replace them with MAX and MIN. This rule can be put in the beginning of 
optimizer like `RewriteDistinctAggregates`. We don't need to create a completed 
framework to rewrite agg functions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22800: [SPARK-24499][SQL][DOC][follow-up] Fix spelling in doc

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22800
  
thanks, merging to master/2.4!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22800: [SPARK-24499][SQL][DOC][follow-up] Fix spelling in doc

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22800
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22790: [SPARK-25793][ML]call SaveLoadV2_0.load for classNameV2_...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22790
  
cc @mengxr @WeichenXu123 how serious is it? shall we treat it as a blocker?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22799: [SPARK-25805][SQL][TEST] Fix test for SPARK-25159

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22799
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22788: [SPARK-25769][SQL]escape nested columns by backti...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22788#discussion_r227203199
  
--- Diff: 
sql/core/src/test/resources/sql-tests/results/columnresolution-negative.sql.out 
---
@@ -81,7 +81,7 @@ SELECT t1.i1 FROM t1, mydb1.t1
 struct<>
 -- !query 9 output
 org.apache.spark.sql.AnalysisException
-Reference 't1.i1' is ambiguous, could be: mydb1.t1.i1, mydb1.t1.i1.; line 
1 pos 7
+Reference '`t1`.`i1`' is ambiguous, could be: mydb1.t1.i1, mydb1.t1.i1.; 
line 1 pos 7
--- End diff --

These examples only make sense when we have the outer backticks. e.g. 
`'t1.i1'` is good.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22788: [SPARK-25769][SQL]escape nested columns by backti...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22788#discussion_r227202767
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 ---
@@ -99,7 +99,7 @@ case class UnresolvedTableValuedFunction(
 case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute 
with Unevaluable {
 
   def name: String =
-nameParts.map(n => if (n.contains(".")) s"`$n`" else n).mkString(".")
+nameParts.map(n => if (nameParts.length > 1 || n.contains(".")) 
s"`$n`" else n).mkString(".")
--- End diff --

I don't think this is better for `name`, we should update `sql` though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22512: [SPARK-25498][SQL] InterpretedMutableProjection should h...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22512
  
LGTM, we also need a unit test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227202171
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala ---
@@ -140,6 +141,14 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
 val input = fileToString(new File(testCase.inputFile))
 
 val (comments, code) = input.split("\n").partition(_.startsWith("--"))
+
+// Runs all the tests on both codegen-only and interpreter modes. 
Since explain results differ
+// when `WHOLESTAGE_CODEGEN_ENABLED` disabled, we don't run these 
tests now.
+val codegenConfigSets = Array(("false", "NO_CODEGEN"), ("true", 
"CODEGEN_ONLY")).map {
+  case (wholeStageCodegenEnabled, codegenFactoryMode) =>
+Array( // SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
wholeStageCodegenEnabled,
--- End diff --

If `wholeStageCodegenEnabled` is not used, let's not complicate the code 
now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227200656
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -49,10 +51,54 @@ class InterpretedMutableProjection(expressions: 
Seq[Expression]) extends Mutable
   def currentValue: InternalRow = mutableRow
 
   override def target(row: InternalRow): MutableProjection = {
+// If `mutableRow` is `UnsafeRow`, `MutableProjection` accepts 
fixed-length types only
+assert(!row.isInstanceOf[UnsafeRow] ||
+  validExprs.forall { case (e, _) => 
UnsafeRow.isFixedLength(e.dataType) })
 mutableRow = row
 this
   }
 
+  private[this] val fieldWriters = validExprs.map { case (e, i) =>
+val writer = generateRowWriter(i, e.dataType)
+if (!e.nullable) {
+  (v: Any) => writer(v)
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(v)
+}
+  }
+}
+  }
+
+  private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = 
dt match {
--- End diff --

we have `InternalRow.getAccessor`, shall we move this method there too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227200458
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -49,10 +51,54 @@ class InterpretedMutableProjection(expressions: 
Seq[Expression]) extends Mutable
   def currentValue: InternalRow = mutableRow
 
   override def target(row: InternalRow): MutableProjection = {
+// If `mutableRow` is `UnsafeRow`, `MutableProjection` accepts 
fixed-length types only
+assert(!row.isInstanceOf[UnsafeRow] ||
+  validExprs.forall { case (e, _) => 
UnsafeRow.isFixedLength(e.dataType) })
 mutableRow = row
 this
   }
 
+  private[this] val fieldWriters = validExprs.map { case (e, i) =>
+val writer = generateRowWriter(i, e.dataType)
+if (!e.nullable) {
+  (v: Any) => writer(v)
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(v)
+}
+  }
+}
+  }
+
+  private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = 
dt match {
+case BooleanType =>
+  v => mutableRow.setBoolean(ordinal, v.asInstanceOf[Boolean])
+case ByteType =>
+  v => mutableRow.setByte(ordinal, v.asInstanceOf[Byte])
+case ShortType =>
+  v => mutableRow.setShort(ordinal, v.asInstanceOf[Short])
+case IntegerType | DateType =>
+  v => mutableRow.setInt(ordinal, v.asInstanceOf[Int])
+case LongType | TimestampType =>
+  v => mutableRow.setLong(ordinal, v.asInstanceOf[Long])
+case FloatType =>
+  v => mutableRow.setFloat(ordinal, v.asInstanceOf[Float])
+case DoubleType =>
+  v => mutableRow.setDouble(ordinal, v.asInstanceOf[Double])
+case DecimalType.Fixed(precision, _) =>
+  v => mutableRow.setDecimal(ordinal, v.asInstanceOf[Decimal], 
precision)
+case CalendarIntervalType | BinaryType | _: ArrayType | StringType | 
_: StructType |
+ _: MapType | _: UserDefinedType[_] =>
+  v => mutableRow.update(ordinal, v)
+case NullType =>
+  v => {}
--- End diff --

shall we call `mutableRow.setNullAt`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22745#discussion_r227037566
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
 ---
@@ -278,24 +278,20 @@ object JavaTypeInference {
 
   case _ if mapType.isAssignableFrom(typeToken) =>
 val (keyType, valueType) = mapKeyValueType(typeToken)
-val keyDataType = inferDataType(keyType)._1
-val valueDataType = inferDataType(valueType)._1
 
 val keyData =
   Invoke(
-MapObjects(
+UnresolvedMapObjects(
   p => deserializerFor(keyType, Some(p)),
-  Invoke(getPath, "keyArray", ArrayType(keyDataType)),
-  keyDataType),
+  UnresolvedGetArrayFromMap(getPath, GetArrayFromMap.Key())),
--- End diff --

Yea we can write eval and doGenCode from scratch. It's also more efficient 
since we can omit the useless try-catch in `Invoke`.

e.g.
```
// from UnaryExpression
override def nullSafeEval(input: Any) = input.asInstanceOf[MapData].keys

override def doGenCode = defineCodeGen(ctx, ev, c => s"$c.keys")
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19788
  
BTW, let's add a config for this feature. We may enable adaptive execution 
by default in the future, and we should still allow users to run spark with 
legacy shuffle service. We should also throw a meaningful exception to ask 
users to turn off this config or upgrade shuffle service if incompatibility 
happens.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22786: [SPARK-25764][ML][EXAMPLES] Update BisectingKMeans examp...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22786
  
also cc @WeichenXu123 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20820: [SPARK-23676][SQL]Support left join codegen in SortMerge...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20820
  
This is good to have, but we should follow `BroadcastHashJoinExec` and make 
the implementation more structured. e.g. `codegenInner`, `codegenOuter`, etc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22788: [SPARK-25769][SQL]change nested columns from `a.b...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22788#discussion_r226988117
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2702,7 +2702,7 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
 
   val e = intercept[AnalysisException](sql("SELECT v.i from (SELECT i 
FROM v)"))
   assert(e.message ==
-"cannot resolve '`v.i`' given input columns: 
[__auto_generated_subquery_name.i]")
--- End diff --

I think the problem here is the out-most backticks, do you know where we 
add it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22788: [SPARK-25769][SQL]change nested columns from `a.b...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22788#discussion_r226987977
  
--- Diff: 
sql/core/src/test/resources/sql-tests/results/columnresolution-negative.sql.out 
---
@@ -81,7 +81,7 @@ SELECT t1.i1 FROM t1, mydb1.t1
 struct<>
 -- !query 9 output
 org.apache.spark.sql.AnalysisException
-Reference 't1.i1' is ambiguous, could be: mydb1.t1.i1, mydb1.t1.i1.; line 
1 pos 7
+Reference '`t1`.`i1`' is ambiguous, could be: mydb1.t1.i1, mydb1.t1.i1.; 
line 1 pos 7
--- End diff --

why is the new format better? it's more verbose, isn't it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17520: [WIP][SPARK-19712][SQL] Move PullupCorrelatedPredicates ...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/17520
  
is it time to revisit it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22047: [SPARK-19851] Add support for EVERY and ANY (SOME...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22047#discussion_r226985475
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -403,6 +403,28 @@ def countDistinct(col, *cols):
 return Column(jc)
 
 
+def every(col):
--- End diff --

+1 for option 1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21860: [SPARK-24901][SQL]Merge the codegen of RegularHashMap an...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/21860
  
LGTM except the naming


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21860: [SPARK-24901][SQL]Merge the codegen of RegularHas...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21860#discussion_r226981919
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -744,6 +744,7 @@ case class HashAggregateExec(
 val unsafeRowKeys = unsafeRowKeyCode.value
 val unsafeRowBuffer = ctx.freshName("unsafeRowAggBuffer")
 val fastRowBuffer = ctx.freshName("fastAggBuffer")
+val updateAggBuffer = ctx.freshName("updateAggBuffer")
--- End diff --

`updatedAggBuffer`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22745#discussion_r226981527
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
 ---
@@ -278,24 +278,20 @@ object JavaTypeInference {
 
   case _ if mapType.isAssignableFrom(typeToken) =>
 val (keyType, valueType) = mapKeyValueType(typeToken)
-val keyDataType = inferDataType(keyType)._1
-val valueDataType = inferDataType(valueType)._1
 
 val keyData =
   Invoke(
-MapObjects(
+UnresolvedMapObjects(
   p => deserializerFor(keyType, Some(p)),
-  Invoke(getPath, "keyArray", ArrayType(keyDataType)),
-  keyDataType),
+  UnresolvedGetArrayFromMap(getPath, GetArrayFromMap.Key())),
--- End diff --

Seems we don't need to make it unresolved

```
case class GetArrayFromMap(map: Expression, getKey: Boolean) extends 
Expression = {
  override def inputTypes = Seq(MapType)

  override def dataType = {
val MapType(kt, vt) = map.dataType.asInstanceOf[MapType]
if (getKey) kv else vt
  }

  override def eval...
 
  override def doCodegen...
}

```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22785: [SPARK-25791][SQL] Datatype of serializers in Row...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22785#discussion_r226978705
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 ---
@@ -187,7 +187,7 @@ object RowEncoder {
 val convertedField = if (field.nullable) {
   If(
 Invoke(inputObject, "isNullAt", BooleanType, Literal(index) :: 
Nil),
-Literal.create(null, field.dataType),
+Literal.create(null, fieldValue.dataType),
--- End diff --

can we add a comment to explain it? `field.dataType` can be different from 
`fieldValue.dataType`, because we strip UDT


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22785: [SPARK-25791][SQL] Datatype of serializers in Row...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22785#discussion_r226978368
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 ---
@@ -171,7 +171,7 @@ object RowEncoder {
 
   if (inputObject.nullable) {
 If(IsNull(inputObject),
-  Literal.create(null, inputType),
+  Literal.create(null, nonNullOutput.dataType),
--- End diff --

I see, for this case it has no difference, but makes the code clearer


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22785: [SPARK-25791][SQL] Datatype of serializers in Row...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22785#discussion_r226978811
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
 ---
@@ -273,6 +273,16 @@ class RowEncoderSuite extends 
CodegenInterpretedPlanTest {
 assert(e4.getMessage.contains("java.lang.String is not a valid 
external type"))
   }
 
+  test("SPARK-25791: Datatype of serializers should be accessible") {
+val udtSQLType = new StructType().add("a", IntegerType)
+val pythonUDT = new PythonUserDefinedType(udtSQLType, "pyUDT", 
"serializedPyClass")
--- End diff --

can we reproduce the bug with normal UDT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22785: [SPARK-25791][SQL] Datatype of serializers in Row...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22785#discussion_r226977895
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 ---
@@ -171,7 +171,7 @@ object RowEncoder {
 
   if (inputObject.nullable) {
 If(IsNull(inputObject),
-  Literal.create(null, inputType),
+  Literal.create(null, nonNullOutput.dataType),
--- End diff --

what's the difference?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22785: [SPARK-25791][SQL] Datatype of serializers in Row...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22785#discussion_r226977568
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
 ---
@@ -273,6 +273,16 @@ class RowEncoderSuite extends 
CodegenInterpretedPlanTest {
 assert(e4.getMessage.contains("java.lang.String is not a valid 
external type"))
   }
 
+  test("SPARK-25791: Datatype of serializers should be accessible") {
+val udtSQLType = new StructType().add("a", IntegerType)
+val pythonUDT = new PythonUserDefinedType(udtSQLType, "pyUDT", 
"serializedPyClass")
+val schema = new StructType().add("pythonUDT", pythonUDT, true)
+val encoder = RowEncoder(schema)
+// scalastyle:off println
+encoder.serializer.foreach(s => println(s.dataType))
--- End diff --

we shouldn't print in a test, can we use assert?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21402: SPARK-24355 Spark external shuffle server improvement to...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/21402
  
shall we close it since #22173 is merged?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19788
  
Hi @yucai , good points on the performance concerns. Let's go with the 
previous approach: 
https://github.com/apache/spark/pull/19788#issuecomment-366887404

sorry for the back and forth!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-10-21 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22575
  
do we have a full story about stream sql? is the `STREAM` keyword the only 
difference between stream sql and normal sql?

also cc @tdas @zsxwing 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-10-21 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22575
  
do we have a full story about stream sql? is the `STREAM` keyword the only 
difference between stream sql and normal sql? how could users define watermark 
with SQL?

also cc @tdas @zsxwing 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-10-21 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22575
  
do we have a full story about stream sql? is the `STREAM` keyword the only 
difference between stream sql and normal sql? how could users define watermark 
with SQL?

also cc @tdas @zsxwing 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-10-21 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22575
  
do we have a full story about stream sql? is the `STREAM` keyword the only 
difference between stream sql and normal sql? how could users define watermark 
with SQL?

also cc @tdas @zsxwing 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-10-21 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22575
  
do we have a full story about stream sql? is the `STREAM` keyword the only 
difference between stream sql and normal sql? how could users define watermark 
with SQL?

also cc @tdas @zsxwing 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-10-21 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22575
  
Do we have a full story about streaming SQL? is the `STREAM` keyword the 
only difference between stream sql and normal sql?

also cc @tdas @zsxwing 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22756: [SPARK-25758][ML] Deprecate computeCost on BisectingKMea...

2018-10-20 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22756
  
reverted from master. Let's move the discussion to 
https://github.com/apache/spark/pull/22764


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22763: [SPARK-25764][ML][EXAMPLES] Update BisectingKMeans examp...

2018-10-20 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22763
  
ah i see. @mgaido91 can you resubmit it and update the description? The 
method is not deprecated now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22501: [SPARK-25492][TEST] Refactor WideSchemaBenchmark to use ...

2018-10-20 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22501
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

2018-10-20 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22547
  
Let's move the high-level discussion to 
https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22750: [SPARK-25747][SQL] remove ColumnarBatchScan.needs...

2018-10-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22750#discussion_r226819049
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -168,10 +168,11 @@ case class FileSourceScanExec(
 
   // Note that some vals referring the file-based relation are lazy 
intentionally
   // so that this plan can be canonicalized on executor side too. See 
SPARK-23731.
-  override lazy val supportsBatch: Boolean = 
relation.fileFormat.supportBatch(
-relation.sparkSession, StructType.fromAttributes(output))
+  override lazy val supportsBatch: Boolean = {
+relation.fileFormat.supportBatch(relation.sparkSession, schema)
+  }
 
-  override lazy val needsUnsafeRowConversion: Boolean = {
+  private lazy val needsUnsafeRowConversion: Boolean = {
 if (relation.fileFormat.isInstanceOf[ParquetSource]) {
--- End diff --

Our Parquet reader has one more feature than ORC: if vectorized reader is 
on but whole-stage-codegen is off, we can still read parquet with batches, and 
return `ColumnarRow`s. In ORC,  if whole-stage-codegen is off, we also turn off 
vectorized reader, so ORC never return `ColumnarRow`.

However, I just found that this parquet reader feature is gone. Let's send 
a new PR to fix it or completely remove it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22756: [SPARK-25758][ML] Deprecate computeCost on BisectingKMea...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22756
  
shall we revert it from master as well? At least we need to update the 
message `This method is deprecated and will be removed in 3.0.0.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22781: [MINOR][DOC] Fix the building document to describ...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22781#discussion_r226816630
  
--- Diff: docs/building-spark.md ---
@@ -12,7 +12,7 @@ redirect_from: "building-with-maven.html"
 ## Apache Maven
 
 The Maven-based build is the build of reference for Apache Spark.
-Building Spark using Maven requires Maven 3.3.9 or newer and Java 8+.
+Building Spark using Maven requires Maven 3.3.9 or newer and Java 8.
--- End diff --

let's change to the expected maven version, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22501: [SPARK-25492][TEST] Refactor WideSchemaBenchmark to use ...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22501
  
seems jenkins is broken, cc @shaneknapp 
```
Command "/tmp/tmp.JfFHaoRFPU/3.5/bin/python -c "import setuptools, 
tokenize;__file__='/home/jenkins/workspace/SparkPullRequestBuilder/python/setup.py';f=getattr(tokenize,
 'open', open)(__file__);code=f.read().replace('\r\n', 
'\n');f.close();exec(compile(code, __file__, 'exec'))" develop --no-deps" 
failed with error code 1 in 
/home/jenkins/workspace/SparkPullRequestBuilder/python/
You are using pip version 10.0.1, however version 18.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
Cleaning up temporary directory - /tmp/tmp.JfFHaoRFPU
[error] running 
/home/jenkins/workspace/SparkPullRequestBuilder/dev/run-pip-tests ; received 
return code 1
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22750: [SPARK-25747][SQL] remove ColumnarBatchScan.needsUnsafeR...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22750
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226812577
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/Format.java ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * The base interface for data source v2. Implementations must have a 
public, 0-arg constructor.
+ *
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ */
+@InterfaceStability.Evolving
+public interface Format extends DataSourceV2 {
--- End diff --

the write API has not been migrated and still need `DataSourceV2`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22750: [SPARK-25747][SQL] remove ColumnarBatchScan.needs...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22750#discussion_r226812447
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala 
---
@@ -164,12 +162,11 @@ private[sql] trait ColumnarBatchScan extends 
CodegenSupport {
 val outputVars = output.zipWithIndex.map { case (a, i) =>
--- End diff --

good catch!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22501: [SPARK-25492][TEST] Refactor WideSchemaBenchmark to use ...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22501
  
thank you guys for refreshing the benchmarks and results! It's very helpful.

If possible, can we post the perf regressions we found in the umbrella 
JIRA? Then people can see if the perf regression is reasonable(if we have 
addressed it) or investigate how the regression was introduced.

Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22763: [SPARK-25764][ML][EXAMPLES] Update BisectingKMeans examp...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22763
  
This has been reverted from master/2.4


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22764: [SPARK-25765][ML] Add training cost to BisectingKMeans s...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22764
  
Since this PR is a little more complicated than we expect, we decided to 
not have it in 2.4.0. I'm not sure if we can treat it as a special case and put 
it in 2.4.1, cc @mengxr 

Anyway, the other 2 related PRs(deprecating the API and updating the 
example) are reverted. We need to think about what we should do if we can only 
do this in 3.0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22466#discussion_r226655438
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 ---
@@ -207,6 +207,14 @@ class SessionCatalog(
   "you cannot create a database with this name.")
 }
 validateName(dbName)
+// SPARK-25464 fail if DB location exists and is not empty
+val dbPath = new Path(dbDefinition.locationUri)
+val fs = dbPath.getFileSystem(hadoopConf)
+if (!externalCatalog.databaseExists(dbName) && fs.exists(dbPath)
+  && fs.listStatus(dbPath).nonEmpty) {
--- End diff --

if the behavior needs to be consistent over all the external catalog 
implementations(it should be), then I think putting it here is reasonable.

But listing files might be too expensive(e.g. several hourse), is there a 
better way to check if a directory is empty?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22764: [SPARK-25765][ML] Add training cost to BisectingK...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22764#discussion_r226652377
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
 ---
@@ -225,13 +227,14 @@ object BisectingKMeansModel extends 
Loader[BisectingKMeansModel] {
   assert(formatVersion == thisFormatVersion)
   val rootId = (metadata \ "rootId").extract[Int]
   val distanceMeasure = (metadata \ "distanceMeasure").extract[String]
+  val trainingCost = (metadata \ "trainingCost").extract[Double]
--- End diff --

Do other models have this problem? I was told that this change just follows 
what we did for other models before.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22766: [SPARK-25768][SQL] fix constant argument expecting UDAFs

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22766
  
thanks, merging to master/2.4/2.3!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22743: [SPARK-25740][SQL] Refactor DetermineTableStats to inval...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22743
  
> Datasource table will not cache in tableRelationCache.

I don't think so. Spark caches data source table in `FindDataSourceTable`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22666: [SPARK-25672][SQL] schema_of_csv() - schema infer...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22666#discussion_r226641023
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---
@@ -3886,6 +3886,31 @@ object functions {
 withExpr(new CsvToStructs(e.expr, schema.expr, options.asScala.toMap))
   }
 
+  /**
+   * Parses a column containing a CSV string and infers its schema.
+   *
+   * @param e a string column containing CSV data.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def schema_of_csv(e: Column): Column = withExpr(new SchemaOfCsv(e.expr))
+
+  /**
+   * Parses a column containing a CSV string and infers its schema using 
options.
+   *
+   * @param e a string column containing CSV data.
+   * @param options options to control how the CSV is parsed. accepts the 
same options and the
+   *json data source. See [[DataFrameReader#csv]].
+   * @return a column with string literal containing schema in DDL format.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def schema_of_csv(e: Column, options: java.util.Map[String, String]): 
Column = {
--- End diff --

shall we have an API with scala Map?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22666: [SPARK-25672][SQL] schema_of_csv() - schema infer...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22666#discussion_r226640860
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala
 ---
@@ -155,4 +155,15 @@ class CsvExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with P
 }.getCause
 assert(exception.getMessage.contains("from_csv() doesn't support the 
DROPMALFORMED mode"))
   }
+
+  test("infer schema of CSV strings") {
+checkEvaluation(new SchemaOfCsv(Literal.create("1,abc")), 
"struct<_c0:int,_c1:string>")
+  }
+
+  test("infer schema of CSV strings by using options") {
+checkEvaluation(
+  new SchemaOfCsv(Literal.create("1|abc"),
+CreateMap(Seq(Literal.create("delimiter"), Literal.create("|",
--- End diff --

the main constructor of `SchemaOfCsv` accepts `Map[String, String]` 
directly, shall we use that?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22666: [SPARK-25672][SQL] schema_of_csv() - schema infer...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22666#discussion_r226640362
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -60,7 +63,7 @@ case class CsvToStructs(
   // Used in `FunctionRegistry`
   def this(child: Expression, schema: Expression, options: Map[String, 
String]) =
 this(
-  schema = ExprUtils.evalSchemaExpr(schema),
+  schema = ExprUtils.evalSchemaExpr(schema).asInstanceOf[StructType],
--- End diff --

why do we need `asInstanceOf`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF constructor sig...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22732
  
thanks, merging to master/2.4!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22750: [SPARK-25747][SQL] remove ColumnarBatchScan.needsUnsafeR...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22750
  
`DataSourceScanExec` does not have `needsUnsafeRowConversion`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22743: [SPARK-25740][SQL] Refactor DetermineTableStats to inval...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22743
  
why it's only a problem for hive tables?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22732#discussion_r226520350
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -932,6 +935,23 @@ trait ScalaReflection {
 tpe.dealias.erasure.typeSymbol.asClass.fullName
   }
 
+  /**
+   * Returns the nullability of the input parameter types of the scala 
function object.
+   *
+   * Note that this only works with Scala 2.11, and the information 
returned may be inaccurate if
+   * used with a different Scala version.
--- End diff --

shall we explicitly return seq of true if it's not scala 2.11? Then the 
behavior is more predictable than `may be inaccurate`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226519284
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -43,10 +44,11 @@ import org.apache.spark.util.Utils
  *to the name `value`.
  */
 object ExpressionEncoder {
+
   def apply[T : TypeTag](): ExpressionEncoder[T] = {
-// We convert the not-serializable TypeTag into StructType and 
ClassTag.
 val mirror = ScalaReflection.mirror
-val tpe = typeTag[T].in(mirror).tpe
+val tpe = ScalaReflection.localTypeOf[T]
--- End diff --

`localTypeOf` has a `dealias` at the end.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22743: [SPARK-25740][SQL] Refactor DetermineTableStats to inval...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22743
  
can you explain more about how this happens?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22732#discussion_r226517584
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
 ---
@@ -39,29 +42,29 @@ import org.apache.spark.sql.types.DataType
  * @param nullable  True if the UDF can return null value.
  * @param udfDeterministic  True if the UDF is deterministic. 
Deterministic UDF returns same result
  *  each time it is invoked with a particular 
input.
- * @param nullableTypes which of the inputTypes are nullable (i.e. not 
primitive)
  */
 case class ScalaUDF(
 function: AnyRef,
 dataType: DataType,
 children: Seq[Expression],
+inputsNullSafe: Seq[Boolean],
 inputTypes: Seq[DataType] = Nil,
 udfName: Option[String] = None,
 nullable: Boolean = true,
-udfDeterministic: Boolean = true,
-nullableTypes: Seq[Boolean] = Nil)
+udfDeterministic: Boolean = true)
   extends Expression with ImplicitCastInputTypes with NonSQLExpression 
with UserDefinedExpression {
 
   // The constructor for SPARK 2.1 and 2.2
   def this(
   function: AnyRef,
   dataType: DataType,
   children: Seq[Expression],
+  inputsNullSafe: Seq[Boolean],
--- End diff --

SGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22501: [SPARK-25492][TEST] Refactor WideSchemaBenchmark ...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22501#discussion_r226516354
  
--- Diff: sql/core/benchmarks/WideSchemaBenchmark-results.txt ---
@@ -1,117 +1,145 @@
-Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6
-Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz

+
+parsing large select expressions

+
 
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
+Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 parsing large select:Best/Avg Time(ms)Rate(M/s)   
Per Row(ns)   Relative
 

-1 select expressions 2 /4  0.0 
2050147.0   1.0X
-100 select expressions   6 /7  0.0 
6123412.0   0.3X
-2500 select expressions135 /  141  0.0   
134623148.0   0.0X
+1 select expressions 2 /4  0.0 
1934953.0   1.0X
+100 select expressions   4 /5  0.0 
3659399.0   0.5X
+2500 select expressions 68 /   76  0.0
68278937.0   0.0X
 
-Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6
-Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz
 

+
+many column field read and write

+
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
+Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 many column field r/w:   Best/Avg Time(ms)Rate(M/s)   
Per Row(ns)   Relative
 

-1 cols x 10 rows (read in-mem)  16 /   18  6.3 
158.6   1.0X
-1 cols x 10 rows (exec in-mem)  17 /   19  6.0 
166.7   1.0X
-1 cols x 10 rows (read parquet) 24 /   26  4.3 
235.1   0.7X
-1 cols x 10 rows (write parquet)81 /   85  1.2 
811.3   0.2X
-100 cols x 1000 rows (read in-mem)  17 /   19  6.0 
166.2   1.0X
-100 cols x 1000 rows (exec in-mem)  25 /   27  4.0 
249.2   0.6X
-100 cols x 1000 rows (read parquet) 23 /   25  4.4 
226.0   0.7X
-100 cols x 1000 rows (write parquet)83 /   87  1.2 
831.0   0.2X
-2500 cols x 40 rows (read in-mem)  132 /  137  0.8 
   1322.9   0.1X
-2500 cols x 40 rows (exec in-mem)  326 /  330  0.3 
   3260.6   0.0X
-2500 cols x 40 rows (read parquet) 831 /  839  0.1 
   8305.8   0.0X
-2500 cols x 40 rows (write parquet)237 /  245  0.4 
   2372.6   0.1X
-
-Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6
-Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz
+1 cols x 10 rows (read in-mem)  22 /   25  4.6 
219.4   1.0X
+1 cols x 10 rows (exec in-mem)  22 /   28  4.5 
223.8   1.0X
+1 cols x 10 rows (read parquet) 45 /   49  2.2 
449.6   0.5X
+1 cols x 10 rows (write parquet)   204 /  223  0.5 
   2044.4   0.1X
--- End diff --

I have no idea how this happens. Can you create a JIRA ticket to 
investigate this regression?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22763: [SPARK-25764][ML][EXAMPLES] Update BisectingKMeans examp...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22763
  
thanks, merging to master/2.4!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22764: [SPARK-25765][ML] Add training cost to BisectingK...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22764#discussion_r226512051
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala ---
@@ -310,4 +317,6 @@ class BisectingKMeansSummary private[clustering] (
 predictionCol: String,
 featuresCol: String,
 k: Int,
-numIter: Int) extends ClusteringSummary(predictions, predictionCol, 
featuresCol, k, numIter)
+numIter: Int,
+@Since("2.4.0") val trainingCost: Double)
--- End diff --

SGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22750: [SPARK-25747][SQL] remove ColumnarBatchScan.needsUnsafeR...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22750
  
which description is inaccurate?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22766: [SPARK-25768][SQL] fix constant argument expectin...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22766#discussion_r226511589
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -340,39 +340,39 @@ private[hive] case class HiveUDAFFunction(
 resolver.getEvaluator(parameterInfo)
   }
 
-  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
-  @transient
-  private lazy val partial1ModeEvaluator = newEvaluator()
+  private case class PartialEvaluator(
+evaluator: GenericUDAFEvaluator,
+objectInspector: ObjectInspector)
 
+  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
   // Hive `ObjectInspector` used to inspect partial aggregation results.
   @transient
-  private val partialResultInspector = partial1ModeEvaluator.init(
-GenericUDAFEvaluator.Mode.PARTIAL1,
-inputInspectors
-  )
+  private lazy val partial1Mode = {
+val evaluator = newEvaluator()
+PartialEvaluator(evaluator, 
evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors))
+  }
 
   // The UDAF evaluator used to merge partial aggregation results.
   @transient
   private lazy val partial2ModeEvaluator = {
 val evaluator = newEvaluator()
-evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, 
Array(partialResultInspector))
+evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, 
Array(partial1Mode.objectInspector))
 evaluator
   }
 
   // Spark SQL data type of partial aggregation results
   @transient
-  private lazy val partialResultDataType = 
inspectorToDataType(partialResultInspector)
+  private lazy val partialResultDataType = 
inspectorToDataType(partial1Mode.objectInspector)
 
   // The UDAF evaluator used to compute the final result from a partial 
aggregation result objects.
-  @transient
-  private lazy val finalModeEvaluator = newEvaluator()
-
   // Hive `ObjectInspector` used to inspect the final aggregation result 
object.
   @transient
-  private val returnInspector = finalModeEvaluator.init(
-GenericUDAFEvaluator.Mode.FINAL,
-Array(partialResultInspector)
-  )
+  private lazy val finalMode = {
--- End diff --

ah it's also used in final mode, then maybe `HiveEvaluator` is a better 
name than `PartialEvaluator`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22766: [SPARK-25768][SQL] fix constant argument expectin...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22766#discussion_r226511506
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -340,39 +340,39 @@ private[hive] case class HiveUDAFFunction(
 resolver.getEvaluator(parameterInfo)
   }
 
-  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
-  @transient
-  private lazy val partial1ModeEvaluator = newEvaluator()
+  private case class PartialEvaluator(
+evaluator: GenericUDAFEvaluator,
+objectInspector: ObjectInspector)
 
+  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
   // Hive `ObjectInspector` used to inspect partial aggregation results.
   @transient
-  private val partialResultInspector = partial1ModeEvaluator.init(
-GenericUDAFEvaluator.Mode.PARTIAL1,
-inputInspectors
-  )
+  private lazy val partial1Mode = {
+val evaluator = newEvaluator()
+PartialEvaluator(evaluator, 
evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors))
+  }
 
   // The UDAF evaluator used to merge partial aggregation results.
   @transient
   private lazy val partial2ModeEvaluator = {
 val evaluator = newEvaluator()
-evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, 
Array(partialResultInspector))
+evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, 
Array(partial1Mode.objectInspector))
 evaluator
   }
 
   // Spark SQL data type of partial aggregation results
   @transient
-  private lazy val partialResultDataType = 
inspectorToDataType(partialResultInspector)
+  private lazy val partialResultDataType = 
inspectorToDataType(partial1Mode.objectInspector)
 
   // The UDAF evaluator used to compute the final result from a partial 
aggregation result objects.
-  @transient
-  private lazy val finalModeEvaluator = newEvaluator()
-
   // Hive `ObjectInspector` used to inspect the final aggregation result 
object.
   @transient
-  private val returnInspector = finalModeEvaluator.init(
-GenericUDAFEvaluator.Mode.FINAL,
-Array(partialResultInspector)
-  )
+  private lazy val finalMode = {
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22766: [SPARK-25768][SQL] fix constant argument expectin...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22766#discussion_r226511479
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -340,39 +340,39 @@ private[hive] case class HiveUDAFFunction(
 resolver.getEvaluator(parameterInfo)
   }
 
-  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
-  @transient
-  private lazy val partial1ModeEvaluator = newEvaluator()
+  private case class PartialEvaluator(
+evaluator: GenericUDAFEvaluator,
+objectInspector: ObjectInspector)
 
+  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
   // Hive `ObjectInspector` used to inspect partial aggregation results.
   @transient
-  private val partialResultInspector = partial1ModeEvaluator.init(
-GenericUDAFEvaluator.Mode.PARTIAL1,
-inputInspectors
-  )
+  private lazy val partial1Mode = {
--- End diff --

`partial1ModeEvaluator`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22764: [SPARK-25765][ML] Add training cost to BisectingK...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22764#discussion_r226384584
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala ---
@@ -310,4 +317,6 @@ class BisectingKMeansSummary private[clustering] (
 predictionCol: String,
 featuresCol: String,
 k: Int,
-numIter: Int) extends ClusteringSummary(predictions, predictionCol, 
featuresCol, k, numIter)
+numIter: Int,
+@Since("2.4.0") val trainingCost: Double)
--- End diff --

oh wait. If the final goal is to have a consistent ML API in 3.0, do we 
have to put this new API in 2.4?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22766: [SPARK-25768][SQL] fix constant argument expectin...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22766#discussion_r226375347
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -339,40 +339,38 @@ private[hive] case class HiveUDAFFunction(
 val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
 resolver.getEvaluator(parameterInfo)
   }
+  
+  private case class Mode(evaluator: GenericUDAFEvaluator, 
objectInspector: ObjectInspector)
--- End diff --

maybe just `PartialEvaluator`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22766: [SPARK-25768][SQL] fix constant argument expecting UDAFs

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22766
  
OK to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22764: [SPARK-25765][ML] Add training cost to BisectingK...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22764#discussion_r226372701
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala ---
@@ -310,4 +317,6 @@ class BisectingKMeansSummary private[clustering] (
 predictionCol: String,
 featuresCol: String,
 k: Int,
-numIter: Int) extends ClusteringSummary(predictions, predictionCol, 
featuresCol, k, numIter)
+numIter: Int,
+@Since("2.4.0") val trainingCost: Double)
--- End diff --

this PR targets to 2.4, see more context at 
https://github.com/apache/spark/pull/22756


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22547
  
A major part of this PR is to update existing streaming sources, which is 
just moving code around. There are 3 things we need to pay attention to during 
review:
1. the naming and documentation of the new interfaces.
2. the new streaming query planning workflow. See the PR description for 
details.
3. the updated tests, make sure there is nothing wrong.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226363445
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -381,7 +390,7 @@ class StreamSuite extends StreamTest {
 
   test("insert an extraStrategy") {
 try {
-  spark.experimental.extraStrategies = TestStrategy :: Nil
+  spark.experimental.extraStrategies = CustomStrategy :: Nil
--- End diff --

Since we need to do a temporary planning for streaming queries, we can't 
allow custom strategy to remove streaming leaf nodes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226363020
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -154,21 +159,25 @@ class StreamSuite extends StreamTest {
   }
 
   test("SPARK-20432: union one stream with itself") {
-val df = 
spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
-val unioned = df.union(df)
-withTempDir { outputDir =>
-  withTempDir { checkpointDir =>
-val query =
-  unioned
-.writeStream.format("parquet")
-.option("checkpointLocation", checkpointDir.getAbsolutePath)
-.start(outputDir.getAbsolutePath)
-try {
-  query.processAllAvailable()
-  val outputDf = 
spark.read.parquet(outputDir.getAbsolutePath).as[Long]
-  checkDatasetUnorderly[Long](outputDf, (0L to 10L).union((0L to 
10L)).toArray: _*)
-} finally {
-  query.stop()
+val v1Source = 
spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
+val v2Source = 
spark.readStream.format(classOf[FakeFormat].getName).load().select("a")
+
+Seq(v1Source, v2Source).foreach { df =>
--- End diff --

improve this test to make sure v2 also works.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226361309
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
 ---
@@ -319,29 +307,18 @@ class RateSourceSuite extends StreamTest {
   "rate source does not support user-specified schema"))
   }
 
-  test("continuous in registry") {
--- End diff --

we don't need this test now. With the new `Format` abstraction, the lookup 
logic is unified between microbatch and continuous


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226359031
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchInputStream.scala
 ---
@@ -60,6 +59,14 @@ class RateStreamMicroBatchReadSupport(options: 
DataSourceOptions, checkpointLoca
 s" is $maxSeconds, but 'rampUpTimeSeconds' is $rampUpTimeSeconds.")
   }
 
+  private val numPartitions = {
--- End diff --

moved from 
https://github.com/apache/spark/pull/22547/files#diff-6cd4de793a1c68d3d9415a246823b55eL151


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226355931
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -90,6 +140,8 @@ class ContinuousExecution(
 do {
   runContinuous(sparkSessionForStream)
 } while (state.updateAndGet(stateUpdate) == ACTIVE)
+
+stopSources()
--- End diff --

with the new abstraction, we should only stop sources when the stream query 
ends, instead of each reconfiguration.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226338580
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java 
---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.BatchScan;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * A mix-in interface for {@link Table}. Table implementations can mixin 
this interface to
+ * provide data reading ability for batch processing.
+ */
+@InterfaceStability.Evolving
+public interface SupportsBatchRead extends Table {
+
+  /**
+   * Creates a {@link BatchScan} instance with a {@link ScanConfig} and 
user-specified options.
+   *
+   * @param config a {@link ScanConfig} which may contains operator 
pushdown information.
+   * @param options the user-specified options, which is same as the one 
used to create the
+   *{@link ScanConfigBuilder} that built the given {@link 
ScanConfig}.
--- End diff --

Another choice is to let `ScanConfig` carry the options. But `ScanConfig` 
is an interface and doing this will put more work at user side, so I decided to 
pass the options again here. Feedbacks are welcome!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22764: [SPARK-25765][ML] Add training cost to BisectingKMeans s...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22764
  
does the example need to be updated with this new API?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEncoder to...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22749
  
I like this idea! waiting for tests pass


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226301402
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -212,21 +183,88 @@ object ExpressionEncoder {
  * A generic encoder for JVM objects that uses Catalyst Expressions for a 
`serializer`
  * and a `deserializer`.
  *
- * @param schema The schema after converting `T` to a Spark SQL row.
- * @param serializer A set of expressions, one for each top-level field 
that can be used to
- *   extract the values from a raw object into an 
[[InternalRow]].
- * @param deserializer An expression that will construct an object given 
an [[InternalRow]].
+ * @param objSerializer An expression that can be used to encode a raw 
object to corresponding
+ *   Spark SQL representation that can be a primitive 
column, array, map or a
+ *   struct. This represents how Spark SQL generally 
serializes an object of
+ *   type `T`.
+ * @param objDeserializer An expression that will construct an object 
given a Spark SQL
+ *representation. This represents how Spark SQL 
generally deserializes
+ *a serialized value in Spark SQL representation 
back to an object of
+ *type `T`.
  * @param clsTag A classtag for `T`.
  */
 case class ExpressionEncoder[T](
-schema: StructType,
-flat: Boolean,
-serializer: Seq[Expression],
-deserializer: Expression,
+objSerializer: Expression,
+objDeserializer: Expression,
 clsTag: ClassTag[T])
   extends Encoder[T] {
 
-  if (flat) require(serializer.size == 1)
+  /**
+   * A set of expressions, one for each top-level field that can be used to
+   * extract the values from a raw object into an [[InternalRow]]:
+   * 1. If `serializer` encodes a raw object to a struct, we directly use 
the `serializer`.
+   * 2. For other cases, we create a struct to wrap the `serializer`.
+   */
+  val serializer: Seq[NamedExpression] = {
+val serializedAsStruct = 
objSerializer.dataType.isInstanceOf[StructType]
+val clsName = Utils.getSimpleName(clsTag.runtimeClass)
+
+if (serializedAsStruct) {
+  val nullSafeSerializer = objSerializer.transformUp {
+case r: BoundReference =>
+  // For input object of Product type, we can't encode it to row 
if it's null, as Spark SQL
+  // doesn't allow top-level row to be null, only its columns can 
be null.
+  AssertNotNull(r, Seq("top level Product or row object"))
+  }
+  nullSafeSerializer match {
+case If(_, _, s: CreateNamedStruct) => s
+case s: CreateNamedStruct => s
+case _ =>
+  throw new RuntimeException(s"class $clsName has unexpected 
serializer: $objSerializer")
+  }
+} else {
+  // For other input objects like primitive, array, map, etc., we 
construct a struct to wrap
+  // the serializer which is a column of an row.
+  CreateNamedStruct(Literal("value") :: objSerializer :: Nil)
+}
+  }.flatten
+
+  /**
+   * Returns an expression that can be used to deserialize an input row to 
an object of type `T`
+   * with a compatible schema. Fields of the row will be extracted using 
`UnresolvedAttribute`.
+   * of the same name as the constructor arguments.
+   *
+   * For complex objects that are encoded to structs, Fields of the struct 
will be extracted using
+   * `GetColumnByOrdinal` with corresponding ordinal.
+   */
+  val deserializer: Expression = {
+val serializedAsStruct = 
objSerializer.dataType.isInstanceOf[StructType]
+
+if (serializedAsStruct) {
+  // We serialized this kind of objects to root-level row. The input 
of general deserializer
+  // is a `GetColumnByOrdinal(0)` expression to extract first column 
of a row. We need to
+  // transform attributes accessors.
+  objDeserializer.transform {
+case UnresolvedExtractValue(GetColumnByOrdinal(0, _),
+Literal(part: UTF8String, StringType)) =>
+  UnresolvedAttribute.quoted(part.toString)
+case GetStructField(GetColumnByOrdinal(0, dt), ordinal, _) =>
+  GetColumnByOrdinal(ordinal, dt)
+case If(IsNull(GetColumnByOrdinal(0, _)), _, n: NewInstance) => n
+case If(IsNull(GetColumnByOrdinal(0, _)), _, i: 
InitializeJavaBean) => i
+  }
+} else {
+  // For other input objects like primitive, array, map, etc., we 
deserialize the first column
+ 

[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226301139
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -212,21 +183,88 @@ object ExpressionEncoder {
  * A generic encoder for JVM objects that uses Catalyst Expressions for a 
`serializer`
  * and a `deserializer`.
  *
- * @param schema The schema after converting `T` to a Spark SQL row.
- * @param serializer A set of expressions, one for each top-level field 
that can be used to
- *   extract the values from a raw object into an 
[[InternalRow]].
- * @param deserializer An expression that will construct an object given 
an [[InternalRow]].
+ * @param objSerializer An expression that can be used to encode a raw 
object to corresponding
+ *   Spark SQL representation that can be a primitive 
column, array, map or a
+ *   struct. This represents how Spark SQL generally 
serializes an object of
+ *   type `T`.
+ * @param objDeserializer An expression that will construct an object 
given a Spark SQL
+ *representation. This represents how Spark SQL 
generally deserializes
+ *a serialized value in Spark SQL representation 
back to an object of
+ *type `T`.
  * @param clsTag A classtag for `T`.
  */
 case class ExpressionEncoder[T](
-schema: StructType,
-flat: Boolean,
-serializer: Seq[Expression],
-deserializer: Expression,
+objSerializer: Expression,
+objDeserializer: Expression,
 clsTag: ClassTag[T])
   extends Encoder[T] {
 
-  if (flat) require(serializer.size == 1)
+  /**
+   * A set of expressions, one for each top-level field that can be used to
+   * extract the values from a raw object into an [[InternalRow]]:
+   * 1. If `serializer` encodes a raw object to a struct, we directly use 
the `serializer`.
+   * 2. For other cases, we create a struct to wrap the `serializer`.
+   */
+  val serializer: Seq[NamedExpression] = {
+val serializedAsStruct = 
objSerializer.dataType.isInstanceOf[StructType]
+val clsName = Utils.getSimpleName(clsTag.runtimeClass)
+
+if (serializedAsStruct) {
+  val nullSafeSerializer = objSerializer.transformUp {
+case r: BoundReference =>
+  // For input object of Product type, we can't encode it to row 
if it's null, as Spark SQL
+  // doesn't allow top-level row to be null, only its columns can 
be null.
+  AssertNotNull(r, Seq("top level Product or row object"))
+  }
+  nullSafeSerializer match {
+case If(_, _, s: CreateNamedStruct) => s
+case s: CreateNamedStruct => s
+case _ =>
+  throw new RuntimeException(s"class $clsName has unexpected 
serializer: $objSerializer")
+  }
+} else {
+  // For other input objects like primitive, array, map, etc., we 
construct a struct to wrap
+  // the serializer which is a column of an row.
+  CreateNamedStruct(Literal("value") :: objSerializer :: Nil)
+}
+  }.flatten
+
+  /**
+   * Returns an expression that can be used to deserialize an input row to 
an object of type `T`
+   * with a compatible schema. Fields of the row will be extracted using 
`UnresolvedAttribute`.
+   * of the same name as the constructor arguments.
+   *
+   * For complex objects that are encoded to structs, Fields of the struct 
will be extracted using
+   * `GetColumnByOrdinal` with corresponding ordinal.
+   */
+  val deserializer: Expression = {
+val serializedAsStruct = 
objSerializer.dataType.isInstanceOf[StructType]
+
+if (serializedAsStruct) {
+  // We serialized this kind of objects to root-level row. The input 
of general deserializer
+  // is a `GetColumnByOrdinal(0)` expression to extract first column 
of a row. We need to
+  // transform attributes accessors.
+  objDeserializer.transform {
+case UnresolvedExtractValue(GetColumnByOrdinal(0, _),
+Literal(part: UTF8String, StringType)) =>
+  UnresolvedAttribute.quoted(part.toString)
+case GetStructField(GetColumnByOrdinal(0, dt), ordinal, _) =>
+  GetColumnByOrdinal(ordinal, dt)
+case If(IsNull(GetColumnByOrdinal(0, _)), _, n: NewInstance) => n
+case If(IsNull(GetColumnByOrdinal(0, _)), _, i: 
InitializeJavaBean) => i
+  }
+} else {
+  // For other input objects like primitive, array, map, etc., we 
deserialize the first column
+ 

[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226299441
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -212,21 +183,88 @@ object ExpressionEncoder {
  * A generic encoder for JVM objects that uses Catalyst Expressions for a 
`serializer`
  * and a `deserializer`.
  *
- * @param schema The schema after converting `T` to a Spark SQL row.
- * @param serializer A set of expressions, one for each top-level field 
that can be used to
- *   extract the values from a raw object into an 
[[InternalRow]].
- * @param deserializer An expression that will construct an object given 
an [[InternalRow]].
+ * @param objSerializer An expression that can be used to encode a raw 
object to corresponding
+ *   Spark SQL representation that can be a primitive 
column, array, map or a
+ *   struct. This represents how Spark SQL generally 
serializes an object of
+ *   type `T`.
+ * @param objDeserializer An expression that will construct an object 
given a Spark SQL
+ *representation. This represents how Spark SQL 
generally deserializes
+ *a serialized value in Spark SQL representation 
back to an object of
+ *type `T`.
  * @param clsTag A classtag for `T`.
  */
 case class ExpressionEncoder[T](
-schema: StructType,
-flat: Boolean,
-serializer: Seq[Expression],
-deserializer: Expression,
+objSerializer: Expression,
+objDeserializer: Expression,
 clsTag: ClassTag[T])
   extends Encoder[T] {
 
-  if (flat) require(serializer.size == 1)
+  /**
+   * A set of expressions, one for each top-level field that can be used to
--- End diff --

set -> sequence


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226298803
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -212,21 +183,88 @@ object ExpressionEncoder {
  * A generic encoder for JVM objects that uses Catalyst Expressions for a 
`serializer`
  * and a `deserializer`.
  *
- * @param schema The schema after converting `T` to a Spark SQL row.
- * @param serializer A set of expressions, one for each top-level field 
that can be used to
- *   extract the values from a raw object into an 
[[InternalRow]].
- * @param deserializer An expression that will construct an object given 
an [[InternalRow]].
+ * @param objSerializer An expression that can be used to encode a raw 
object to corresponding
+ *   Spark SQL representation that can be a primitive 
column, array, map or a
+ *   struct. This represents how Spark SQL generally 
serializes an object of
+ *   type `T`.
+ * @param objDeserializer An expression that will construct an object 
given a Spark SQL
+ *representation. This represents how Spark SQL 
generally deserializes
+ *a serialized value in Spark SQL representation 
back to an object of
+ *type `T`.
  * @param clsTag A classtag for `T`.
  */
 case class ExpressionEncoder[T](
-schema: StructType,
-flat: Boolean,
-serializer: Seq[Expression],
-deserializer: Expression,
+objSerializer: Expression,
+objDeserializer: Expression,
 clsTag: ClassTag[T])
   extends Encoder[T] {
 
-  if (flat) require(serializer.size == 1)
+  /**
+   * A set of expressions, one for each top-level field that can be used to
+   * extract the values from a raw object into an [[InternalRow]]:
+   * 1. If `serializer` encodes a raw object to a struct, we directly use 
the `serializer`.
+   * 2. For other cases, we create a struct to wrap the `serializer`.
+   */
+  val serializer: Seq[NamedExpression] = {
+val serializedAsStruct = 
objSerializer.dataType.isInstanceOf[StructType]
+val clsName = Utils.getSimpleName(clsTag.runtimeClass)
+
+if (serializedAsStruct) {
+  val nullSafeSerializer = objSerializer.transformUp {
+case r: BoundReference =>
+  // For input object of Product type, we can't encode it to row 
if it's null, as Spark SQL
+  // doesn't allow top-level row to be null, only its columns can 
be null.
+  AssertNotNull(r, Seq("top level Product or row object"))
+  }
+  nullSafeSerializer match {
+case If(_, _, s: CreateNamedStruct) => s
--- End diff --

let's also make sure the if condition is `IsNull`, which better explains 
why we strip it(it can't be null)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226296369
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -103,75 +88,61 @@ object ExpressionEncoder {
* name/positional binding is preserved.
*/
   def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = {
--- End diff --

cool, this method is simplified a lot with the new abstraction.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226295859
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -103,75 +88,61 @@ object ExpressionEncoder {
* name/positional binding is preserved.
*/
   def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = {
+if (encoders.length > 22) {
+  throw new RuntimeException("Can't construct a tuple encoder for more 
than 22 encoders.")
+}
+
 encoders.foreach(_.assertUnresolved())
 
 val schema = StructType(encoders.zipWithIndex.map {
   case (e, i) =>
-val (dataType, nullable) = if (e.flat) {
-  e.schema.head.dataType -> e.schema.head.nullable
-} else {
-  e.schema -> true
-}
-StructField(s"_${i + 1}", dataType, nullable)
+StructField(s"_${i + 1}", e.objSerializer.dataType, 
e.objSerializer.nullable)
 })
 
 val cls = 
Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}")
 
-val serializer = encoders.zipWithIndex.map { case (enc, index) =>
-  val originalInputObject = enc.serializer.head.collect { case b: 
BoundReference => b }.head
+val serializers = encoders.zipWithIndex.map { case (enc, index) =>
+  val boundRefs = enc.objSerializer.collect { case b: BoundReference 
=> b }.distinct
+  assert(boundRefs.size == 1, "object serializer should have only one 
bound reference but " +
+s"there are ${boundRefs.size}")
+
+  val originalInputObject = boundRefs.head
   val newInputObject = Invoke(
 BoundReference(0, ObjectType(cls), nullable = true),
 s"_${index + 1}",
-originalInputObject.dataType)
+originalInputObject.dataType,
+returnNullable = originalInputObject.nullable)
 
-  val newSerializer = enc.serializer.map(_.transformUp {
+  val newSerializer = enc.objSerializer.transformUp {
 case b: BoundReference if b == originalInputObject => 
newInputObject
--- End diff --

Since there is only one distinct `BoundReference`, we can just write `case 
b: BoundReference  => newInputObject`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226294255
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -103,75 +88,61 @@ object ExpressionEncoder {
* name/positional binding is preserved.
*/
   def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = {
+if (encoders.length > 22) {
--- End diff --

can we do it in a separated PR with a test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226294017
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -43,10 +44,11 @@ import org.apache.spark.util.Utils
  *to the name `value`.
  */
 object ExpressionEncoder {
+
   def apply[T : TypeTag](): ExpressionEncoder[T] = {
-// We convert the not-serializable TypeTag into StructType and 
ClassTag.
 val mirror = ScalaReflection.mirror
-val tpe = typeTag[T].in(mirror).tpe
+val tpe = ScalaReflection.localTypeOf[T]
--- End diff --

why change it from `typeTag[T].in(mirror).tpe`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22721: [SPARK-25403][SQL] Refreshes the table after inserting t...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22721
  
I think it's reasonable to follow `InsertIntoHiveTable`, but it's better to 
provide more details about what changes in `InsertIntoHadoopFsRelationCommand`:
1. what's refreshed? Previously we refreshed the data cache via path, and 
also refresh the file index. But the plan cache is still there. Now we refresh 
the plan cache. Since file index exists in the plan, so we don't need to 
refresh it if we refresh plan cache, but the data cache still needs to be 
refreshed.
2. what's the performance impact? plan cache is very useful when reading 
partitioned tables, to avoid listing files repeatedly. But seems it's OK 
because we already refresh file index before, so we must re-list files after 
insertion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22721: [SPARK-25403][SQL] Refreshes the table after inse...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22721#discussion_r226280121
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 ---
@@ -189,6 +189,7 @@ case class InsertIntoHadoopFsRelationCommand(
   sparkSession.catalog.refreshByPath(outputPath.toString)
 
   if (catalogTable.nonEmpty) {
+
sparkSession.catalog.refreshTable(catalogTable.get.identifier.quotedString)
--- End diff --

shall we follow `InsertIntoHiveTable` and use 
`sparkSession.sessionState.catalog.refreshTable`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22756: [SPARK-25758][ML] Deprecate computeCost on BisectingKMea...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22756
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22547
  
Hi @rdblue welcome back! I just rebased it so it's ready for review :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22721: [SPARK-25403][SQL] Refreshes the table after inse...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22721#discussion_r226208576
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 ---
@@ -189,6 +189,7 @@ case class InsertIntoHadoopFsRelationCommand(
   sparkSession.catalog.refreshByPath(outputPath.toString)
 
   if (catalogTable.nonEmpty) {
+
sparkSession.catalog.refreshTable(catalogTable.get.identifier.quotedString)
--- End diff --

shall we do
```
if (catalogTable.isDefined) {
  
sparkSession.catalog.refreshTable(catalogTable.get.identifier.quotedString)
} else {
  // refresh cached files in FileIndex
   fileIndex.foreach(_.refresh())
   // refresh data cache if table is cached
  sparkSession.catalog.refreshByPath(outputPath.toString)
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22758#discussion_r226198591
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   None)
 val logicalRelation = cached.getOrElse {
   val updatedTable = inferIfNeeded(relation, options, fileFormat)
+  // Intialize the catalogTable stats if its not defined.An intial 
value has to be defined
--- End diff --

> but after create table command, when we do insert command within the same 
session Hive statistics is not getting updated

This is the thing I don't understand. Like I said before, even if table has 
no stats, Spark will still get a stats via the `DetermineTableStats` rule.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22732#discussion_r226156536
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala ---
@@ -393,4 +393,30 @@ class UDFSuite extends QueryTest with SharedSQLContext 
{
   checkAnswer(df, Seq(Row("12"), Row("24"), Row("3null"), Row(null)))
 }
   }
+
+  test("SPARK-25044 Verify null input handling for primitive types - with 
udf()") {
+val udf1 = udf({(x: Long, y: Any) => x * 2 + (if (y == null) 1 else 
0)})
+val df = spark.range(0, 3).toDF("a")
+  .withColumn("b", udf1($"a", lit(null)))
+  .withColumn("c", udf1(lit(null), $"a"))
+
+checkAnswer(
+  df,
+  Seq(
+Row(0, 1, null),
+Row(1, 3, null),
+Row(2, 5, null)))
+  }
+
+  test("SPARK-25044 Verify null input handling for primitive types - with 
udf.register") {
+withTable("t") {
--- End diff --

ah we shuold do `(null, new Integer(1), "x"), ("N", null, "y"), ("N", new 
Integer(3), null)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22732#discussion_r226156400
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1978,6 +1978,7 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
   - Since Spark 2.4, empty strings are saved as quoted empty strings `""`. 
In version 2.3 and earlier, empty strings are equal to `null` values and do not 
reflect to any characters in saved CSV files. For example, the row of `"a", 
null, "", 1` was writted as `a,,,1`. Since Spark 2.4, the same row is saved as 
`a,,"",1`. To restore the previous behavior, set the CSV option `emptyValue` to 
empty (not quoted) string.  
   - Since Spark 2.4, The LOAD DATA command supports wildcard `?` and `*`, 
which match any one character, and zero or more characters, respectively. 
Example: `LOAD DATA INPATH '/tmp/folder*/'` or `LOAD DATA INPATH 
'/tmp/part-?'`. Special Characters like `space` also now work in paths. 
Example: `LOAD DATA INPATH '/tmp/folder name/'`.
   - In Spark version 2.3 and earlier, HAVING without GROUP BY is treated 
as WHERE. This means, `SELECT 1 FROM range(10) HAVING true` is executed as 
`SELECT 1 FROM range(10) WHERE true`  and returns 10 rows. This violates SQL 
standard, and has been fixed in Spark 2.4. Since Spark 2.4, HAVING without 
GROUP BY is treated as a global aggregate, which means `SELECT 1 FROM range(10) 
HAVING true` will return only one row. To restore the previous behavior, set 
`spark.sql.legacy.parser.havingWithoutGroupByAsWhere` to `true`.
+  - Since Spark 2.4, use of the method `def udf(f: AnyRef, dataType: 
DataType): UserDefinedFunction` should not expect any automatic null handling 
of the input parameters, thus a null input of a Scala primitive type will be 
converted to the type's corresponding default value in the UDF. All other UDF 
declaration and registration methods remain the same behavior as before.
--- End diff --

yes, this fixes a bug introduced by a commit in 2.4


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    4   5   6   7   8   9   10   11   12   13   >