[GitHub] spark issue #23143: [SPARK-24762][SQL][Followup] Enable Option of Product en...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23143
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236514039
  
--- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java 
---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.reader.Scan;
+import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface representing a logical structured data set of a data 
source. For example, the
+ * implementation can be a directory on the file system, a topic of Kafka, 
or a table in the
+ * catalog, etc.
+ *
+ * This interface can mixin the following interfaces to support different 
operations:
+ * 
+ *   {@link SupportsBatchRead}: this table can be read in batch 
queries.
+ * 
+ */
+@Evolving
+public interface Table {
+
+  /**
+   * Returns the schema of this table.
+   */
+  StructType schema();
+
+  /**
+   * Returns a {@link ScanBuilder} which can be used to build a {@link 
Scan} later. Spark will call
+   * this method for each data scanning query.
+   *
+   * The builder can take some query specific information to do operators 
pushdown, and keep these
+   * information in the created {@link Scan}.
+   */
+  ScanBuilder newScanBuilder(DataSourceOptions options);
--- End diff --

I agree with it. Since `CaseInsensitiveStringMap` is not in the code base 
yet, shall we do it in the followup?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236513622
  
--- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java 
---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
--- End diff --

why does this `Table` API need to be in catalyst? It's not even a plan. We 
can define a table plan interface in catalyst, and implement it in the SQL 
module with this `Table` API.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23127
  
there are still 2 golden file test failures because of the plan change...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23138: [SPARK-23356][SQL][TEST] add new test cases for a...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23138#discussion_r236334056
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
 ---
@@ -196,4 +196,31 @@ class SetOperationSuite extends PlanTest {
   ))
 comparePlans(expectedPlan, rewrittenPlan)
   }
+
+  test("SPARK-23356 union: expressions with number in project list are 
pushed down") {
--- End diff --

`expressions with number` -> `expressions with literal`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23127#discussion_r236333530
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends 
CodegenSupport {
   override def limitNotReachedChecks: Seq[String] = Nil
 }
 
+/**
+ * Leaf codegen node reading from a single RDD.
+ */
+trait InputRDDCodegen extends CodegenSupport {
+
+  def inputRDD: RDD[InternalRow]
+
+  // If the input is an RDD of InternalRow which are potentially not 
UnsafeRow,
+  // and there is no parent to consume it, it needs an UnsafeProjection.
+  protected val createUnsafeProjection: Boolean = (parent == null)
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+inputRDD :: Nil
+  }
+
+  override def doProduce(ctx: CodegenContext): String = {
+// Inline mutable state since an InputRDDCodegen is used once in a 
task for WholeStageCodegen
+val input = ctx.addMutableState("scala.collection.Iterator", "input", 
v => s"$v = inputs[0];",
+  forceInline = true)
+val row = ctx.freshName("row")
+
+val outputVars = if (createUnsafeProjection) {
+  // creating the vars will make the parent consume add an unsafe 
projection.
+  ctx.INPUT_ROW = row
+  ctx.currentVars = null
+  output.zipWithIndex.map { case (a, i) =>
+BoundReference(i, a.dataType, a.nullable).genCode(ctx)
+  }
+} else {
+  null
+}
+
+val numOutputRowsCode = if (metrics.contains("numOutputRows")) {
--- End diff --

how about `updateNumOutputRowsMetrics` instead of `numOutputRowsCode`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23127#discussion_r236332786
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends 
CodegenSupport {
   override def limitNotReachedChecks: Seq[String] = Nil
 }
 
+/**
+ * Leaf codegen node reading from a single RDD.
+ */
+trait InputRDDCodegen extends CodegenSupport {
+
+  def inputRDD: RDD[InternalRow]
+
+  // If the input is an RDD of InternalRow which are potentially not 
UnsafeRow,
+  // and there is no parent to consume it, it needs an UnsafeProjection.
+  protected val createUnsafeProjection: Boolean = (parent == null)
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+inputRDD :: Nil
+  }
+
+  override def doProduce(ctx: CodegenContext): String = {
--- End diff --

the extra `if createUnsafeProjection` check?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23127#discussion_r236332511
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends 
CodegenSupport {
   override def limitNotReachedChecks: Seq[String] = Nil
 }
 
+/**
+ * Leaf codegen node reading from a single RDD.
+ */
+trait InputRDDCodegen extends CodegenSupport {
+
+  def inputRDD: RDD[InternalRow]
+
+  // If the input is an RDD of InternalRow which are potentially not 
UnsafeRow,
+  // and there is no parent to consume it, it needs an UnsafeProjection.
+  protected val createUnsafeProjection: Boolean = (parent == null)
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+inputRDD :: Nil
+  }
+
+  override def doProduce(ctx: CodegenContext): String = {
--- End diff --

can you highlight the difference between this one and the previous 
`RowDataSourceScanExec.doProduce`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236282401
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
+  }
+
+  // TODO: specialize it
+  private lazy val keys = mutable.ArrayBuffer.empty[Any]
+  private lazy val values = mutable.ArrayBuffer.empty[Any]
+
+  private lazy val keyGetter = InternalRow.getAccessor(keyType)
+  private lazy val valueGetter = InternalRow.getAccessor(valueType)
+
+  def reset(): Unit = {
+keyToIndex.clear()
+keys.clear()
+values.clear()
+  }
+
+  def put(key: Any, value: Any): Unit = {
+if (key == null) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+
+val maybeExistingIdx = keyToIndex.get(key)
+if (maybeExistingIdx.isDefined) {
+  // Overwrite the previous value, as the policy is last wins.
+  values(maybeExistingIdx.get) = value
+} else {
+  keyToIndex.put(key, values.length)
+  keys.append(key)
+  values.append(value)
+}
+  }
+
+  // write a 2-field row, the first field is key and the second field is 
value.
+  def put(entry: InternalRow): Unit = {
+if (entry.isNullAt(0)) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+put(keyGetter(entry, 0), valueGetter(entry, 1))
+  }
+
+  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
+if (keyArray.length != valueArray.length) {
+  throw new RuntimeException(
+"The key array and value array of MapData must have the same 
length.")
+}
+
+var i = 0
+while (i < keyArray.length) {
+  put(keyArray(i), valueArray(i))
+  i += 1
+}
+  }
+
+  def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = {
+if (keyArray.numElements() != valueArray.numElements()) {
+  throw new RuntimeException(
+"The key array and value array of MapData must have the same 
length.")
+}
+
+var i = 0
+while (i < keyArray.numElements()) {
+  put(keyGetter(keyArray, i), valueGetter(valueArray, i))
+  i += 1
+}
+  }
+
+  def build(): ArrayBasedMapData = {
+new ArrayBasedMapData(new GenericArrayData(keys.toArray), new 
GenericArrayData(values.toArray))
+  }
+
+  def from(keyArray: ArrayData, valueArray: ArrayData): ArrayBasedMapData 
= {
+assert(keyToIndex.isEmpty, "'from' can only be called with a fresh 
GenericMapBuilder.")
+putAll(keyArray, valueArray)
--- End diff --

no we can't, as we still need to detect duplicated keys.


---

-

[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236275822
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
 
   override def nullable: Boolean = children.exists(_.nullable)
 
+  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, 
dataType.valueType)
+
   override def eval(input: InternalRow): Any = {
-val maps = children.map(_.eval(input))
+val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
--- End diff --

`toArray` is O(N) but we do it only once. If accessing by index is O(N), 
the total time complexity is O(N ^ 2).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236274428
  
--- Diff: docs/sql-migration-guide-upgrade.md ---
@@ -19,6 +19,8 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In Spark version 2.4 and earlier, users can create map values with map 
type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since 
Spark 3.0, it's not allowed to create map values with map type key with these 
built-in functions. Users can still read map values with map type key from data 
source or Java/Scala collections, though they are not very useful.
 
+  - In Spark version 2.4 and earlier, users can create a map with 
duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. 
The behavior of map with duplicated keys is undefined, e.g. map look up 
respects the duplicated key appears first, `Dataset.collect` only keeps the 
duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since 
Spark 3.0, these built-in functions will remove duplicated map keys with last 
wins policy.
--- End diff --

They are related, but they are not the same. For example, we don't support 
map type as key, because we can't check equality of map type correctly. This is 
just a current implementation limitation, and we may relax it in the future.

Duplicated map keys is a real problem and we will never allow it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23143: [SPARK-24762][SQL][Followup] Enable Option of Pro...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23143#discussion_r236267692
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -251,19 +251,15 @@ case class ExpressionEncoder[T](
*/
   def isSerializedAsStruct: Boolean = 
objSerializer.dataType.isInstanceOf[StructType]
 
-  /**
-   * Returns true if the type `T` is an `Option` type.
-   */
-  def isOptionType: Boolean = 
classOf[Option[_]].isAssignableFrom(clsTag.runtimeClass)
-
   /**
* If the type `T` is serialized as a struct, when it is encoded to a 
Spark SQL row, fields in
* the struct are naturally mapped to top-level columns in a row. In 
other words, the serialized
* struct is flattened to row. But in case of the `T` is also an 
`Option` type, it can't be
* flattened to top-level row, because in Spark SQL top-level row can't 
be null. This method
* returns true if `T` is serialized as struct and is not `Option` type.
*/
-  def isSerializedAsStructForTopLevel: Boolean = isSerializedAsStruct && 
!isOptionType
+  def isSerializedAsStructForTopLevel: Boolean = isSerializedAsStruct &&
--- End diff --

nit: since it cross lines, I'd prefer
```
def xxx = {
  xxx
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23127#discussion_r236151394
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
@@ -199,4 +199,6 @@ case class RDDScanExec(
   override def simpleString: String = {
 s"$nodeName${truncatedString(output, "[", ",", "]")}"
   }
+
+  override def inputRDD: RDD[InternalRow] = rdd
--- End diff --

The test failure is real.  `RDDScanExec.rdd` may not be RDD of unsafe row. 
Maybe we should enforce that, `InputRDDCodegen.inputRDD` is `RDD[UnsafeRow]`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule Repla...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23139#discussion_r236150260
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, 
ArrayFilter, CaseWhen, Expression, If}
+import org.apache.spark.sql.catalyst.expressions.{LambdaFunction, Literal, 
MapFilter, Or}
+import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.types.BooleanType
+
+
+/**
+ * A rule that replaces `Literal(null, BooleanType)` with `FalseLiteral`, 
if possible, in the search
+ * condition of the WHERE/HAVING/ON(JOIN) clauses, which contain an 
implicit Boolean operator
+ * "(search condition) = TRUE". The replacement is only valid when 
`Literal(null, BooleanType)` is
+ * semantically equivalent to `FalseLiteral` when evaluating the whole 
search condition.
+ *
+ * Please note that FALSE and NULL are not exchangeable in most cases, 
when the search condition
+ * contains NOT and NULL-tolerant expressions. Thus, the rule is very 
conservative and applicable
+ * in very limited cases.
+ *
+ * For example, `Filter(Literal(null, BooleanType))` is equal to 
`Filter(FalseLiteral)`.
+ *
+ * Another example containing branches is `Filter(If(cond, FalseLiteral, 
Literal(null, _)))`;
+ * this can be optimized to `Filter(If(cond, FalseLiteral, 
FalseLiteral))`, and eventually
+ * `Filter(FalseLiteral)`.
+ *
+ * Moreover, this rule also transforms predicates in all [[If]] 
expressions as well as branch
+ * conditions in all [[CaseWhen]] expressions, even if they are not part 
of the search conditions.
+ *
+ * For example, `Project(If(And(cond, Literal(null)), Literal(1), 
Literal(2)))` can be simplified
+ * into `Project(Literal(2))`.
+ */
+object ReplaceNullWithFalseInPredicate extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case f @ Filter(cond, _) => f.copy(condition = 
replaceNullWithFalse(cond))
+case j @ Join(_, _, _, Some(cond)) => j.copy(condition = 
Some(replaceNullWithFalse(cond)))
+case p: LogicalPlan => p transformExpressions {
+  case i @ If(pred, _, _) => i.copy(predicate = 
replaceNullWithFalse(pred))
+  case cw @ CaseWhen(branches, _) =>
+val newBranches = branches.map { case (cond, value) =>
+  replaceNullWithFalse(cond) -> value
+}
+cw.copy(branches = newBranches)
+  case af @ ArrayFilter(_, lf @ LambdaFunction(func, _, _)) =>
+val newLambda = lf.copy(function = replaceNullWithFalse(func))
+af.copy(function = newLambda)
+  case ae @ ArrayExists(_, lf @ LambdaFunction(func, _, _)) =>
+val newLambda = lf.copy(function = replaceNullWithFalse(func))
+ae.copy(function = newLambda)
+  case mf @ MapFilter(_, lf @ LambdaFunction(func, _, _)) =>
+val newLambda = lf.copy(function = replaceNullWithFalse(func))
+mf.copy(function = newLambda)
+}
+  }
+
+  /**
+   * Recursively traverse the Boolean-type expression to replace
+   * `Literal(null, BooleanType)` with `FalseLiteral`, if possible.
+   *
+   * Note that `transformExpressionsDown` can not be used here as we must 
stop as soon as we hit
+   * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or
+   * `Literal(null, BooleanType)`.
+   */
+  private def replaceNullWithFalse(e: Expression): Expression = {
+if (e.dataType != BooleanType) {
--- End diff --

We don't 

[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23130#discussion_r236149203
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -388,7 +388,7 @@ case class FileSourceScanExec(
 logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
 val filesGroupedToBuckets =
   selectedPartitions.flatMap { p =>
-p.files.map { f =>
+p.files.filter(_.getLen > 0).map { f =>
--- End diff --

do you mean changing `filter...map...` to `flatMap`? I don't have a strong 
preference about it.

The updated test cases and the new test case are for this change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23135: [SPARK-26168][SQL] Update the code comments in Expressio...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23135
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter's readingIterator fie...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23083
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23137: [SPARK-26169] Create DataFrameSetOperationsSuite

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23137
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23141: [SPARK-26021][SQL][followup] add test for special...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23141#discussion_r236143942
  
--- Diff: docs/sql-migration-guide-upgrade.md ---
@@ -17,14 +17,16 @@ displayTitle: Spark SQL Upgrading Guide
 
   - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records. For example, the JSON string `{"a" 1}` with the 
schema `a INT` is converted to `null` by previous versions but Spark 3.0 
converts it to `Row(null)`.
 
-  - In Spark version 2.4 and earlier, the `from_json` function produces 
`null`s for JSON strings and JSON datasource skips the same independetly of its 
mode if there is no valid root JSON token in its input (` ` for example). Since 
Spark 3.0, such input is treated as a bad record and handled according to 
specified mode. For example, in the `PERMISSIVE` mode the ` ` input is 
converted to `Row(null, null)` if specified schema is `key STRING, value INT`. 
+  - In Spark version 2.4 and earlier, the `from_json` function produces 
`null`s for JSON strings and JSON datasource skips the same independetly of its 
mode if there is no valid root JSON token in its input (` ` for example). Since 
Spark 3.0, such input is treated as a bad record and handled according to 
specified mode. For example, in the `PERMISSIVE` mode the ` ` input is 
converted to `Row(null, null)` if specified schema is `key STRING, value INT`.
 
   - The `ADD JAR` command previously returned a result set with the single 
value 0. It now returns an empty result set.
 
   - In Spark version 2.4 and earlier, users can create map values with map 
type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since 
Spark 3.0, it's not allowed to create map values with map type key with these 
built-in functions. Users can still read map values with map type key from data 
source or Java/Scala collections, though they are not very useful.
-  
+
   - In Spark version 2.4 and earlier, `Dataset.groupByKey` results to a 
grouped dataset with key attribute wrongly named as "value", if the key is 
non-struct type, e.g. int, string, array, etc. This is counterintuitive and 
makes the schema of aggregation queries weird. For example, the schema of 
`ds.groupByKey(...).count()` is `(value, count)`. Since Spark 3.0, we name the 
grouping attribute to "key". The old behaviour is preserved under a newly added 
configuration `spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue` with a 
default value of `false`.
 
+  - In Spark version 2.4 and earlier, float/double -0.0 is semantically 
equal to 0.0, but users can still distinguish them via `Dataset.show`, 
`Dataset.collect` etc. Since Spark 3.0, float/double -0.0 is replaced by 0.0 
internally, and users can't distinguish them any more.
--- End diff --

I checked presto and postgres, the behaviors are same. Hive distinguishes 
-0.0 and 0.0, but it has the group by bug.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23141: [SPARK-26021][SQL][followup] add test for special floati...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23141
  
cc @adoron @kiszk @viirya


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23141: [SPARK-26021][SQL][followup] add test for special...

2018-11-25 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/23141

[SPARK-26021][SQL][followup] add test for special floating point values

## What changes were proposed in this pull request?

a followup of https://github.com/apache/spark/pull/23124 . Add a test to 
show the minor behavior change introduced by #23124 , and add migration guide.

## How was this patch tested?

a new test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark follow

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23141.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23141


commit 8a9103c47931eb61cb329ece046d5efc50e855c2
Author: Wenchen Fan 
Date:   2018-11-26T06:11:09Z

add test for special floating point values




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23104: [SPARK-26138][SQL] LimitPushDown cross join requi...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23104#discussion_r236118983
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -459,6 +459,7 @@ object LimitPushDown extends Rule[LogicalPlan] {
   val newJoin = joinType match {
 case RightOuter => join.copy(right = maybePushLocalLimit(exp, 
right))
 case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left))
+case Cross => join.copy(left = maybePushLocalLimit(exp, left), 
right = maybePushLocalLimit(exp, right))
--- End diff --

inner join without condition is literally cross join.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule Repla...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23139#discussion_r236118914
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, 
ArrayFilter, CaseWhen, Expression, If}
+import org.apache.spark.sql.catalyst.expressions.{LambdaFunction, Literal, 
MapFilter, Or}
+import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.types.BooleanType
+
+
+/**
+ * A rule that replaces `Literal(null, BooleanType)` with `FalseLiteral`, 
if possible, in the search
+ * condition of the WHERE/HAVING/ON(JOIN) clauses, which contain an 
implicit Boolean operator
+ * "(search condition) = TRUE". The replacement is only valid when 
`Literal(null, BooleanType)` is
+ * semantically equivalent to `FalseLiteral` when evaluating the whole 
search condition.
+ *
+ * Please note that FALSE and NULL are not exchangeable in most cases, 
when the search condition
+ * contains NOT and NULL-tolerant expressions. Thus, the rule is very 
conservative and applicable
+ * in very limited cases.
+ *
+ * For example, `Filter(Literal(null, BooleanType))` is equal to 
`Filter(FalseLiteral)`.
+ *
+ * Another example containing branches is `Filter(If(cond, FalseLiteral, 
Literal(null, _)))`;
+ * this can be optimized to `Filter(If(cond, FalseLiteral, 
FalseLiteral))`, and eventually
+ * `Filter(FalseLiteral)`.
+ *
+ * Moreover, this rule also transforms predicates in all [[If]] 
expressions as well as branch
+ * conditions in all [[CaseWhen]] expressions, even if they are not part 
of the search conditions.
+ *
+ * For example, `Project(If(And(cond, Literal(null)), Literal(1), 
Literal(2)))` can be simplified
+ * into `Project(Literal(2))`.
+ */
+object ReplaceNullWithFalseInPredicate extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case f @ Filter(cond, _) => f.copy(condition = 
replaceNullWithFalse(cond))
+case j @ Join(_, _, _, Some(cond)) => j.copy(condition = 
Some(replaceNullWithFalse(cond)))
+case p: LogicalPlan => p transformExpressions {
+  case i @ If(pred, _, _) => i.copy(predicate = 
replaceNullWithFalse(pred))
+  case cw @ CaseWhen(branches, _) =>
+val newBranches = branches.map { case (cond, value) =>
+  replaceNullWithFalse(cond) -> value
+}
+cw.copy(branches = newBranches)
+  case af @ ArrayFilter(_, lf @ LambdaFunction(func, _, _)) =>
+val newLambda = lf.copy(function = replaceNullWithFalse(func))
+af.copy(function = newLambda)
+  case ae @ ArrayExists(_, lf @ LambdaFunction(func, _, _)) =>
+val newLambda = lf.copy(function = replaceNullWithFalse(func))
+ae.copy(function = newLambda)
+  case mf @ MapFilter(_, lf @ LambdaFunction(func, _, _)) =>
+val newLambda = lf.copy(function = replaceNullWithFalse(func))
+mf.copy(function = newLambda)
+}
+  }
+
+  /**
+   * Recursively traverse the Boolean-type expression to replace
+   * `Literal(null, BooleanType)` with `FalseLiteral`, if possible.
+   *
+   * Note that `transformExpressionsDown` can not be used here as we must 
stop as soon as we hit
+   * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or
+   * `Literal(null, BooleanType)`.
+   */
+  private def replaceNullWithFalse(e: Expression): Expression = {
+if (e.dataType != BooleanType) {
--- End diff --

do we nee

[GitHub] spark issue #23135: [SPARK-26168][SQL] Update the code comments in Expressio...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23135
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23127
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23127
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23127#discussion_r236118569
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -350,6 +350,15 @@ trait CodegenSupport extends SparkPlan {
*/
   def needStopCheck: Boolean = parent.needStopCheck
 
+  /**
+   * Helper default should stop check code.
+   */
+  def shouldStopCheckCode: String = if (needStopCheck) {
--- End diff --

we can use in in more places. This can be done in folllowup.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23104: [SPARK-26138][SQL] LimitPushDown cross join requi...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23104#discussion_r236114831
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -459,6 +459,7 @@ object LimitPushDown extends Rule[LogicalPlan] {
   val newJoin = joinType match {
 case RightOuter => join.copy(right = maybePushLocalLimit(exp, 
right))
 case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left))
+case Cross => join.copy(left = maybePushLocalLimit(exp, left), 
right = maybePushLocalLimit(exp, right))
--- End diff --

maybe we  can match `InnerLike` when condition is empty.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23104: [SPARK-26138][SQL] LimitPushDown cross join requi...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23104#discussion_r236114751
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -459,6 +459,7 @@ object LimitPushDown extends Rule[LogicalPlan] {
   val newJoin = joinType match {
 case RightOuter => join.copy(right = maybePushLocalLimit(exp, 
right))
 case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left))
+case Cross => join.copy(left = maybePushLocalLimit(exp, left), 
right = maybePushLocalLimit(exp, right))
--- End diff --

how about inner join without condition?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236112390
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -646,34 +633,35 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
 
 val mapMerge =
   s"""
-|${ev.isNull} = $hasNullName;
-|if (!${ev.isNull}) {
-|  $arrayDataClass[] $keyArgsName = new 
$arrayDataClass[${mapCodes.size}];
-|  $arrayDataClass[] $valArgsName = new 
$arrayDataClass[${mapCodes.size}];
-|  long $numElementsName = 0;
-|  for (int $idxName = 0; $idxName < $argsName.length; $idxName++) 
{
-|$keyArgsName[$idxName] = $argsName[$idxName].keyArray();
-|$valArgsName[$idxName] = $argsName[$idxName].valueArray();
-|$numElementsName += $argsName[$idxName].numElements();
-|  }
-|  if ($numElementsName > 
${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
-|throw new RuntimeException("Unsuccessful attempt to concat 
maps with " +
-|   $numElementsName + " elements due to exceeding the map 
size limit " +
-|   "${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.");
-|  }
-|  $arrayDataClass $finKeysName = $keyConcat($keyArgsName,
-|(int) $numElementsName);
-|  $arrayDataClass $finValsName = $valueConcat($valArgsName,
-|(int) $numElementsName);
-|  ${ev.value} = new $arrayBasedMapDataClass($finKeysName, 
$finValsName);
+|ArrayData[] $keyArgsName = new ArrayData[${mapCodes.size}];
+|ArrayData[] $valArgsName = new ArrayData[${mapCodes.size}];
+|long $numElementsName = 0;
+|for (int $idxName = 0; $idxName < $argsName.length; $idxName++) {
+|  $keyArgsName[$idxName] = $argsName[$idxName].keyArray();
+|  $valArgsName[$idxName] = $argsName[$idxName].valueArray();
+|  $numElementsName += $argsName[$idxName].numElements();
 |}
+|if ($numElementsName > 
${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
--- End diff --

yup. I actually did what you proposed at first, and then realized it's 
different from before and may introduce perf regression. We can investigate it 
in a followup.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236112256
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
 
   override def nullable: Boolean = children.exists(_.nullable)
 
+  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, 
dataType.valueType)
+
   override def eval(input: InternalRow): Any = {
-val maps = children.map(_.eval(input))
+val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
--- End diff --

BTW, if it's not true anymore with scala 2.12, we should update them 
together with a bechmark, instead of only updating this single one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236112177
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
 
   override def nullable: Boolean = children.exists(_.nullable)
 
+  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, 
dataType.valueType)
+
   override def eval(input: InternalRow): Any = {
-val maps = children.map(_.eval(input))
+val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
--- End diff --

in scala, while loop is faster than `foreach`. If you look at 
`Expression.eval` implementations, we use while loop a lot even `foreach` can 
produce simpler code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21732: [SPARK-24762][SQL] Enable Option of Product encod...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21732#discussion_r236111929
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -253,10 +247,24 @@ case class ExpressionEncoder[T](
   })
 
   /**
-   * Returns true if the type `T` is serialized as a struct.
+   * Returns true if the type `T` is serialized as a struct by 
`objSerializer`.
*/
   def isSerializedAsStruct: Boolean = 
objSerializer.dataType.isInstanceOf[StructType]
 
+  /**
+   * Returns true if the type `T` is an `Option` type.
+   */
+  def isOptionType: Boolean = 
classOf[Option[_]].isAssignableFrom(clsTag.runtimeClass)
+
+  /**
+   * If the type `T` is serialized as a struct, when it is encoded to a 
Spark SQL row, fields in
+   * the struct are naturally mapped to top-level columns in a row. In 
other words, the serialized
+   * struct is flattened to row. But in case of the `T` is also an 
`Option` type, it can't be
+   * flattened to top-level row, because in Spark SQL top-level row can't 
be null. This method
+   * returns true if `T` is serialized as struct and is not `Option` type.
+   */
+  def isSerializedAsStructForTopLevel: Boolean = isSerializedAsStruct && 
!isOptionType
--- End diff --

can you send a followup PR to inline `isOptionType` if it's only used here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21732: [SPARK-24762][SQL] Enable Option of Product encoders

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/21732
  
thanks, merging to master, great work!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter's readingIterator fie...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23083
  
LGTM, thanks for your great work!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter's readingIterator fie...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23083
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23130
  
The code change LGTM. There is a mistake in PR description: we updated 
`FileSourceScanExec` not `DataSourceScanExec`. Let's also mention that this 
fixed a behavior change introduced by 
https://github.com/apache/spark/pull/22938 mistakenly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22512: [SPARK-25498][SQL] InterpretedMutableProjection should h...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22512
  
@maropu are you still working on it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23137: [SPARK-26169] Create DataFrameSetOperationsSuite

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23137
  
LGTM, waiting for resolving the conflicts.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23128#discussion_r236108610
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -82,6 +82,14 @@ object SQLMetrics {
 
   private val baseForAvgMetric: Int = 10
 
+  val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched"
+  val LOCAL_BLOCKS_FETCHED = "localBlocksFetched"
+  val REMOTE_BYTES_READ = "remoteBytesRead"
+  val REMOTE_BYTES_READ_TO_DISK = "remoteBytesReadToDisk"
+  val LOCAL_BYTES_READ = "localBytesRead"
+  val FETCH_WAIT_TIME = "fetchWaitTime"
+  val RECORDS_READ = "recordsRead"
--- End diff --

Is there an easy way to sync this list with `ShuffleReadMetrics` instead of 
doing it manually?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23131: [SPARK-25908][SQL][FOLLOW-UP] Add back unionAll

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23131
  
shall we say `union` is an alias of `unionAll` instead of `unionAll` is an 
alias of `Union`? According to the SQL spec, `unionAll` is implemented 
correctly that it keeps duplicated rows, while `union` does not follow SQL 
spec, as it's too widely used and it's too late to change behavior.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23135: [SPARK-26168][SQL] Update the code comments in Ex...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23135#discussion_r236106495
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -43,9 +43,24 @@ import org.apache.spark.sql.types._
  * There are a few important traits:
  *
  * - [[Nondeterministic]]: an expression that is not deterministic.
+ * - [[Stateful]]: an expression that contains mutable state. For example, 
MonotonicallyIncreasingID
+ * and Rand. A stateful expression is always 
non-deterministic.
  * - [[Unevaluable]]: an expression that is not supposed to be evaluated.
  * - [[CodegenFallback]]: an expression that does not have code gen 
implemented and falls back to
  *interpreted mode.
+ * - [[NullIntolerant]]: an expression that is null intolerant (i.e. any 
null input will result in
+ *   null output).
+ * - [[NonSQLExpression]]: a common base trait for the expressions that 
doesn't have SQL
+ * expressions like representation. For example, 
`ScalaUDF`, `ScalaUDAF`,
+ * and object `MapObjects` and `Invoke`.
+ * - [[UserDefinedExpression]]: a common base trait for user-defined 
functions, including
+ *  UDF/UDAF/UDTF.
+ * - [[HigherOrderFunction]]: a common base trait for higher order 
functions that take one or more
+ *(lambda) functions and applies these to some 
objects. The function
+ *produces a number of variables which can be 
consumed by some lambda
+ *function.
+ * - [[NamedExpression]]: An [[Expression]] that is named.
+ * - [[TimeZoneAwareExpression]]: A common base trait for time zone aware 
expressions.
--- End diff --

shall we also mention `SubqueryExpression`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23084: [SPARK-26117][CORE][SQL]use SparkOutOfMemoryError instea...

2018-11-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23084
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23084: [SPARK-26117][CORE][SQL]use SparkOutOfMemoryError instea...

2018-11-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23084
  
I think it's safer to only catch the spark-thrown OOM, not the system OOM, 
so LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235929748
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
--- End diff --

I think we should fail it at analyzer phase, and other map-producing 
functions should do it as well. Can you create a JIRA for it? thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235929210
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
+  }
+
+  // TODO: specialize it
+  private lazy val keys = mutable.ArrayBuffer.empty[Any]
+  private lazy val values = mutable.ArrayBuffer.empty[Any]
+
+  private lazy val keyGetter = InternalRow.getAccessor(keyType)
+  private lazy val valueGetter = InternalRow.getAccessor(valueType)
+
+  def reset(): Unit = {
+keyToIndex.clear()
+keys.clear()
+values.clear()
+  }
+
+  def put(key: Any, value: Any): Unit = {
+if (key == null) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+
+val maybeExistingIdx = keyToIndex.get(key)
+if (maybeExistingIdx.isDefined) {
+  // Overwrite the previous value, as the policy is last wins.
+  values(maybeExistingIdx.get) = value
+} else {
+  keyToIndex.put(key, values.length)
+  keys.append(key)
+  values.append(value)
+}
+  }
+
+  // write a 2-field row, the first field is key and the second field is 
value.
+  def put(entry: InternalRow): Unit = {
+if (entry.isNullAt(0)) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+put(keyGetter(entry, 0), valueGetter(entry, 1))
+  }
+
+  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
--- End diff --

ah good catch!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235928954
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
+  }
+
+  // TODO: specialize it
+  private lazy val keys = mutable.ArrayBuffer.empty[Any]
+  private lazy val values = mutable.ArrayBuffer.empty[Any]
+
+  private lazy val keyGetter = InternalRow.getAccessor(keyType)
+  private lazy val valueGetter = InternalRow.getAccessor(valueType)
+
+  def reset(): Unit = {
+keyToIndex.clear()
+keys.clear()
+values.clear()
+  }
+
+  def put(key: Any, value: Any): Unit = {
+if (key == null) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+
+val maybeExistingIdx = keyToIndex.get(key)
+if (maybeExistingIdx.isDefined) {
+  // Overwrite the previous value, as the policy is last wins.
+  values(maybeExistingIdx.get) = value
+} else {
+  keyToIndex.put(key, values.length)
+  keys.append(key)
+  values.append(value)
+}
+  }
+
+  // write a 2-field row, the first field is key and the second field is 
value.
+  def put(entry: InternalRow): Unit = {
+if (entry.isNullAt(0)) {
--- End diff --

There are 2 put methods have this null check and other put methods all go 
through them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235928588
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -646,34 +633,35 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
 
 val mapMerge =
   s"""
-|${ev.isNull} = $hasNullName;
-|if (!${ev.isNull}) {
-|  $arrayDataClass[] $keyArgsName = new 
$arrayDataClass[${mapCodes.size}];
-|  $arrayDataClass[] $valArgsName = new 
$arrayDataClass[${mapCodes.size}];
-|  long $numElementsName = 0;
-|  for (int $idxName = 0; $idxName < $argsName.length; $idxName++) 
{
-|$keyArgsName[$idxName] = $argsName[$idxName].keyArray();
-|$valArgsName[$idxName] = $argsName[$idxName].valueArray();
-|$numElementsName += $argsName[$idxName].numElements();
-|  }
-|  if ($numElementsName > 
${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
-|throw new RuntimeException("Unsuccessful attempt to concat 
maps with " +
-|   $numElementsName + " elements due to exceeding the map 
size limit " +
-|   "${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.");
-|  }
-|  $arrayDataClass $finKeysName = $keyConcat($keyArgsName,
-|(int) $numElementsName);
-|  $arrayDataClass $finValsName = $valueConcat($valArgsName,
-|(int) $numElementsName);
-|  ${ev.value} = new $arrayBasedMapDataClass($finKeysName, 
$finValsName);
+|ArrayData[] $keyArgsName = new ArrayData[${mapCodes.size}];
+|ArrayData[] $valArgsName = new ArrayData[${mapCodes.size}];
+|long $numElementsName = 0;
+|for (int $idxName = 0; $idxName < $argsName.length; $idxName++) {
+|  $keyArgsName[$idxName] = $argsName[$idxName].keyArray();
+|  $valArgsName[$idxName] = $argsName[$idxName].valueArray();
+|  $numElementsName += $argsName[$idxName].numElements();
 |}
+|if ($numElementsName > 
${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
--- End diff --

This check is done before the `putAll`, so that it can fail fast. I think 
it's fine to ignore duplicated keys here, to make it a more conservative. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235927895
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
 
   override def nullable: Boolean = children.exists(_.nullable)
 
+  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, 
dataType.valueType)
+
   override def eval(input: InternalRow): Any = {
-val maps = children.map(_.eval(input))
+val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
--- End diff --

I need to access it by index below, turn it to array so that the access is 
guaranteed to be O(1).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter Leak

2018-11-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23083
  
Looking at the code, we are trying to fix 2 memory leaks: the task 
completion listener in `ShuffleBlockFetcherIterator`, and the 
`CompletionIterator`. If that's case, can you say that in the PR description?

For the task completion listener, I think it's an overkill to introduce a 
new API, do we know exactly where we leak the memory? and can we null it out 
when the `ShuffleBlockFetcherIterator` reaches to its end?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235849825
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -751,171 +739,46 @@ case class MapFromEntries(child: Expression) extends 
UnaryExpression {
   s"${child.dataType.catalogString} type. $prettyName accepts only 
arrays of pair structs.")
   }
 
+  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, 
dataType.valueType)
+
   override protected def nullSafeEval(input: Any): Any = {
-val arrayData = input.asInstanceOf[ArrayData]
-val numEntries = arrayData.numElements()
+val entries = input.asInstanceOf[ArrayData]
+val numEntries = entries.numElements()
 var i = 0
-if(nullEntries) {
+if (nullEntries) {
   while (i < numEntries) {
-if (arrayData.isNullAt(i)) return null
+if (entries.isNullAt(i)) return null
 i += 1
   }
 }
-val keyArray = new Array[AnyRef](numEntries)
-val valueArray = new Array[AnyRef](numEntries)
+
+mapBuilder.reset()
 i = 0
 while (i < numEntries) {
-  val entry = arrayData.getStruct(i, 2)
-  val key = entry.get(0, dataType.keyType)
-  if (key == null) {
-throw new RuntimeException("The first field from a struct (key) 
can't be null.")
-  }
-  keyArray.update(i, key)
-  val value = entry.get(1, dataType.valueType)
-  valueArray.update(i, value)
+  mapBuilder.put(entries.getStruct(i, 2))
   i += 1
 }
-ArrayBasedMapData(keyArray, valueArray)
+mapBuilder.build()
   }
 
   override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
 nullSafeCodeGen(ctx, ev, c => {
   val numEntries = ctx.freshName("numEntries")
-  val isKeyPrimitive = CodeGenerator.isPrimitiveType(dataType.keyType)
-  val isValuePrimitive = 
CodeGenerator.isPrimitiveType(dataType.valueType)
-  val code = if (isKeyPrimitive && isValuePrimitive) {
-genCodeForPrimitiveElements(ctx, c, ev.value, numEntries)
--- End diff --

since we need to check duplicated map keys, it's not possible to apply this 
trick anymore, as we need to overwrite values if the key appears before.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235849697
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala ---
@@ -125,22 +125,36 @@ object InternalRow {
* actually takes a `SpecializedGetters` input because it can be 
generalized to other classes
* that implements `SpecializedGetters` (e.g., `ArrayData`) too.
*/
-  def getAccessor(dataType: DataType): (SpecializedGetters, Int) => Any = 
dataType match {
-case BooleanType => (input, ordinal) => input.getBoolean(ordinal)
-case ByteType => (input, ordinal) => input.getByte(ordinal)
-case ShortType => (input, ordinal) => input.getShort(ordinal)
-case IntegerType | DateType => (input, ordinal) => 
input.getInt(ordinal)
-case LongType | TimestampType => (input, ordinal) => 
input.getLong(ordinal)
-case FloatType => (input, ordinal) => input.getFloat(ordinal)
-case DoubleType => (input, ordinal) => input.getDouble(ordinal)
-case StringType => (input, ordinal) => input.getUTF8String(ordinal)
-case BinaryType => (input, ordinal) => input.getBinary(ordinal)
-case CalendarIntervalType => (input, ordinal) => 
input.getInterval(ordinal)
-case t: DecimalType => (input, ordinal) => input.getDecimal(ordinal, 
t.precision, t.scale)
-case t: StructType => (input, ordinal) => input.getStruct(ordinal, 
t.size)
-case _: ArrayType => (input, ordinal) => input.getArray(ordinal)
-case _: MapType => (input, ordinal) => input.getMap(ordinal)
-case u: UserDefinedType[_] => getAccessor(u.sqlType)
-case _ => (input, ordinal) => input.get(ordinal, dataType)
+  def getAccessor(dt: DataType, nullable: Boolean = true): 
(SpecializedGetters, Int) => Any = {
--- End diff --

I can move it to a new PR if others think it's necessary. It's a little 
dangerous to ask the caller side to take care of null values.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

2018-11-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23124
  
cc @dongjoon-hyun @gatorsmile @viirya @kiszk @mgaido91 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-22 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/23124

[SPARK-25829][SQL] remove duplicated map keys with last wins policy

## What changes were proposed in this pull request?

Currently duplicated map keys are not handled consistently. For example, 
map look up respects the duplicated key appears first, `Dataset.collect` only 
keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc.

This PR proposes to remove duplicated map keys with last wins policy, to 
follow Java/Scala and Presto. It only applies to built-in functions, as users 
can create map with duplicated map keys via private APIs anyway.

For other places:
1. data source v1 doesn't have this problem, as users need to provide a 
java/scala map, which can't have duplicated keys.
2. data source v2 may have this problem. I've added a note to 
`ArrayBasedMapData` to ask the caller to take care of duplicated keys. In the 
future we should enforce it in the stable data APIs for data source v2.
3. UDF doesn't have this problem, as users need to provide a java/scala 
map. Same as data source v1.
4. file format. I checked all of them and only parquet does not enforce it. 
For backward compatibility reasons I change nothing but leave a note saying 
that the behavior will be undefined if users write map with duplicated keys to 
parquet files. Maybe we can add a config and fail by default if parquet files 
have map with duplicated keys. This can be done in followup.

## How was this patch tested?

updated tests and new tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark map

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23124.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23124


commit cbcd5d7a937f8120ef8527f1f26150ed93f1de0a
Author: Wenchen Fan 
Date:   2018-11-15T02:49:22Z

remove duplicated map keys with last wins policy




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21732: [SPARK-24762][SQL] Enable Option of Product encod...

2018-11-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21732#discussion_r235840903
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -257,6 +251,11 @@ case class ExpressionEncoder[T](
*/
   def isSerializedAsStruct: Boolean = 
objSerializer.dataType.isInstanceOf[StructType]
 
+  /**
+   * Returns true if the type `T` is `Option`.
+   */
+  def isOptionType: Boolean = 
classOf[Option[_]].isAssignableFrom(clsTag.runtimeClass)
--- End diff --

sorry typo: `isSerializedAsStruct && !isOption`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21732: [SPARK-24762][SQL] Enable Option of Product encod...

2018-11-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21732#discussion_r235840884
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -257,6 +251,11 @@ case class ExpressionEncoder[T](
*/
   def isSerializedAsStruct: Boolean = 
objSerializer.dataType.isInstanceOf[StructType]
 
+  /**
+   * Returns true if the type `T` is `Option`.
+   */
+  def isOptionType: Boolean = 
classOf[Option[_]].isAssignableFrom(clsTag.runtimeClass)
--- End diff --

what do you mean? What I asked is a code style change, to make the code 
more maintainable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23105: [SPARK-26140] Enable custom metrics implementatio...

2018-11-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23105#discussion_r235834225
  
--- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala 
---
@@ -48,7 +48,8 @@ private[spark] trait ShuffleManager {
   handle: ShuffleHandle,
   startPartition: Int,
   endPartition: Int,
-  context: TaskContext): ShuffleReader[K, C]
+  context: TaskContext,
+  metrics: ShuffleMetricsReporter): ShuffleReader[K, C]
--- End diff --

IIUC, we should pass a read metrics reporter here, as this method is 
`getReader` which is called by the reducers to read shuffle files.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23105: [SPARK-26140] Enable custom metrics implementatio...

2018-11-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23105#discussion_r235834136
  
--- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+/**
+ * An interface for reporting shuffle read metrics, for each shuffle. This 
interface assumes
+ * all the methods are called on a single-threaded, i.e. concrete 
implementations would not need
+ * to synchronize.
+ *
+ * All methods have additional Spark visibility modifier to allow public, 
concrete implementations
+ * that still have these methods marked as private[spark].
+ */
+private[spark] trait ShuffleReadMetricsReporter {
--- End diff --

how do we plan to use this interface later on? It's not used in this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23105: [SPARK-26140] Enable custom metrics implementatio...

2018-11-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23105#discussion_r235834088
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleMetricsReporter.scala ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+/**
+ * An interface for reporting shuffle information, for each shuffle. This 
interface assumes
--- End diff --

`for each shuffle` -> `for each reducer of a shuffle`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23043: [SPARK-26021][SQL] replace minus zero with zero in Platf...

2018-11-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23043
  
thanks, merging to master/2.4!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23054: [SPARK-26085][SQL] Key attribute of non-struct type unde...

2018-11-21 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23054
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23052: [SPARK-26081][SQL] Prevent empty files for empty partiti...

2018-11-21 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23052
  
First of all, sometimes we do need to write "empty" files, so that we can 
infer schema of a parquet directory. Empty parquet file is not really empty, as 
it has header/footer. https://github.com/apache/spark/pull/20525 guarantees we 
always write out at least one empty file.

One important thing is, when we write out an empty dataframe to file, and 
read it back, it should still be an empty dataframe. I'd suggest we skip empty 
file in text-based data sources, and later on send a followup PR to not write 
empty text files, as a perf improvement.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...

2018-11-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22938#discussion_r235584943
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1892,7 +1898,7 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 .text(path)
 
   val jsonDF = spark.read.option("multiLine", true).option("mode", 
"PERMISSIVE").json(path)
-  assert(jsonDF.count() === corruptRecordCount)
+  assert(jsonDF.count() === corruptRecordCount + 1) // null row for 
empty file
--- End diff --

shall we skip empty files for all the file-based data sources?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22938: [SPARK-25935][SQL] Prevent null rows from JSON parser

2018-11-21 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22938
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23043: [SPARK-26021][SQL] replace minus zero with zero in Platf...

2018-11-21 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23043
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22938: [SPARK-25935][SQL] Prevent null rows from JSON parser

2018-11-21 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22938
  
LGTM except the migration guide. JSON data source can't produce null rows, 
but skip it even with permisive mode.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23054: [SPARK-26085][SQL] Key attribute of non-struct type unde...

2018-11-21 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23054
  
hmmm it conflicts again...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23043: [SPARK-26021][SQL] replace minus zero with zero in Platf...

2018-11-21 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23043
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21732: [SPARK-24762][SQL] Enable Option of Product encoders

2018-11-20 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/21732
  
last comment, LGTM otherwise


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21732: [SPARK-24762][SQL] Enable Option of Product encod...

2018-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21732#discussion_r235273262
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -257,6 +251,11 @@ case class ExpressionEncoder[T](
*/
   def isSerializedAsStruct: Boolean = 
objSerializer.dataType.isInstanceOf[StructType]
 
+  /**
+   * Returns true if the type `T` is `Option`.
+   */
+  def isOptionType: Boolean = 
classOf[Option[_]].isAssignableFrom(clsTag.runtimeClass)
--- End diff --

regarding maintainability, I think it's better to add a 
```
// maybe there is a better naming
def isSerializedAsStructForTopLevel: Boolean = {
  isSerializedAsStruct && isOption
}
```

The benefit is, when people call `isSerializedAsStruct`, they will see this 
similar method and think carefully which one he should use. Otherwise, people 
may miss to check `isOptionType` easily.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22149: [SPARK-25158][SQL]Executor accidentally exit beca...

2018-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22149#discussion_r235266375
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
 ---
@@ -308,6 +308,12 @@ private class ScriptTransformationWriterThread(
   }
   threwException = false
 } catch {
+  // TaskKilledException should not be thrown again, otherwise it will 
be captured by
+  // SparkUncaughtExceptionHandler, then Executor will exit because of 
TaskKilledException.
+  case e: TaskKilledException =>
--- End diff --

I see. So `ScriptTransformationExec` is special because
1. it starts a new thread
2. the new thread is very likely to throw `TaskKilledException`, when 
speculation is on.

I think we should not kill the executor just because 
`ScriptTransformationWriterThread` fails. We should log the error, instead of 
throwing it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23054: [SPARK-26085][SQL] Key attribute of non-struct type unde...

2018-11-20 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23054
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22149: [SPARK-25158][SQL]Executor accidentally exit beca...

2018-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22149#discussion_r235253094
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
 ---
@@ -308,6 +308,12 @@ private class ScriptTransformationWriterThread(
   }
   threwException = false
 } catch {
+  // TaskKilledException should not be thrown again, otherwise it will 
be captured by
+  // SparkUncaughtExceptionHandler, then Executor will exit because of 
TaskKilledException.
+  case e: TaskKilledException =>
--- End diff --

> Actually ScriptTransformationExec like a streaming pipe, but other 
operators basically no such characteristics.

can you elaborate on it? How `TaskKilledException` is handled in other 
operators?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23054: [SPARK-26085][SQL] Key attribute of non-struct ty...

2018-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23054#discussion_r235238609
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1594,6 +1594,15 @@ object SQLConf {
 "WHERE, which does not follow SQL standard.")
   .booleanConf
   .createWithDefault(false)
+
+  val ALIAS_NON_STRUCT_GROUPING_KEY_AS_VALUE =
+buildConf("spark.sql.legacy.dataset.aliasNonStructGroupingKeyAsValue")
+  .internal()
+  .doc("When set to true, the key attribute resulted from running 
`Dataset.groupByKey` " +
+"for non-struct key type, will be named as `value`, following the 
behavior of Spark " +
--- End diff --

so it's "named" as value, not "alias" to value. I think it's more precise 
to call this config `spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22149: [SPARK-25158][SQL]Executor accidentally exit beca...

2018-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22149#discussion_r235238304
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
 ---
@@ -308,6 +308,12 @@ private class ScriptTransformationWriterThread(
   }
   threwException = false
 } catch {
+  // TaskKilledException should not be thrown again, otherwise it will 
be captured by
+  // SparkUncaughtExceptionHandler, then Executor will exit because of 
TaskKilledException.
+  case e: TaskKilledException =>
--- End diff --

which line of code may throw this exception? and how is this exception 
handled in other operators?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23043: [SPARK-26021][SQL] replace minus zero with zero i...

2018-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23043#discussion_r235237427
  
--- Diff: 
common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java ---
@@ -157,4 +159,15 @@ public void heapMemoryReuse() {
 Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
 Assert.assertEquals(obj3, onheap4.getBaseObject());
   }
+
+  @Test
+  // SPARK-26021
+  public void writeMinusZeroIsReplacedWithZero() {
+byte[] doubleBytes = new byte[Double.BYTES];
+byte[] floatBytes = new byte[Float.BYTES];
+Platform.putDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET, -0.0d);
+Platform.putFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET, -0.0f);
+Assert.assertEquals(0, Double.compare(0.0d, 
ByteBuffer.wrap(doubleBytes).getDouble()));
+Assert.assertEquals(0, Float.compare(0.0f, 
ByteBuffer.wrap(floatBytes).getFloat()));
--- End diff --

and would be better to directly check that, the binary of `0.0` and `-0.0` 
are same.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23043: [SPARK-26021][SQL] replace minus zero with zero in Platf...

2018-11-20 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23043
  
LGTM except one tiny comment


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23043: [SPARK-26021][SQL] replace minus zero with zero i...

2018-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23043#discussion_r235237048
  
--- Diff: 
common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java ---
@@ -157,4 +159,15 @@ public void heapMemoryReuse() {
 Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
 Assert.assertEquals(obj3, onheap4.getBaseObject());
   }
+
+  @Test
+  // SPARK-26021
+  public void writeMinusZeroIsReplacedWithZero() {
+byte[] doubleBytes = new byte[Double.BYTES];
+byte[] floatBytes = new byte[Float.BYTES];
+Platform.putDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET, -0.0d);
+Platform.putFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET, -0.0f);
+Assert.assertEquals(0, Double.compare(0.0d, 
ByteBuffer.wrap(doubleBytes).getDouble()));
+Assert.assertEquals(0, Float.compare(0.0f, 
ByteBuffer.wrap(floatBytes).getFloat()));
--- End diff --

can we use `Platform.getFloat` to read the value back? to match how we 
write it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23043: [SPARK-26021][SQL] replace minus zero with zero in Platf...

2018-11-20 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23043
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23084: [SPARK-26117][CORE][SQL]use SparkOutOfMemoryError instea...

2018-11-20 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23084
  
> for example, HashAggregateExec will carry out spill the map and fallback 
to sort-based

Do you mean this patch change nothing for this case?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r234855810
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * A physical representation of a data source scan for batch queries. This 
interface is used to
+ * provide physical information, like how many partitions the scanned data 
has, and how to read
+ * records from the partitions.
+ */
+@InterfaceStability.Evolving
+public interface Batch {
--- End diff --

I don't have a strong preference. I feel it's a little more clear to 
distinguish between scan and batch


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23069: [SPARK-26026][BUILD] Published Scaladoc jars missing fro...

2018-11-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23069
  
Is it a requirement to put annotation class top level?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23043: [SPARK-26021][SQL] replace minus zero with zero i...

2018-11-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23043#discussion_r234847102
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala ---
@@ -723,4 +723,32 @@ class DataFrameAggregateSuite extends QueryTest with 
SharedSQLContext {
   "grouping expressions: [current_date(None)], value: [key: int, 
value: string], " +
 "type: GroupBy]"))
   }
+
+  test("SPARK-26021: Double and Float 0.0/-0.0 should be equal when 
grouping") {
+val colName = "i"
+def groupByCollect(df: DataFrame): Array[Row] = {
+  df.groupBy(colName).count().collect()
+}
+def assertResult[T](result: Array[Row], zero: T)(implicit ordering: 
Ordering[T]): Unit = {
+  assert(result.length == 1)
+  // using compare since 0.0 == -0.0 is true
+  assert(ordering.compare(result(0).getAs[T](0), zero) == 0)
--- End diff --

ah sorry I misread the code. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23043: [SPARK-26021][SQL] replace minus zero with zero i...

2018-11-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23043#discussion_r234847137
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala ---
@@ -723,4 +723,32 @@ class DataFrameAggregateSuite extends QueryTest with 
SharedSQLContext {
   "grouping expressions: [current_date(None)], value: [key: int, 
value: string], " +
 "type: GroupBy]"))
   }
+
+  test("SPARK-26021: Double and Float 0.0/-0.0 should be equal when 
grouping") {
+val colName = "i"
+def groupByCollect(df: DataFrame): Array[Row] = {
+  df.groupBy(colName).count().collect()
+}
+def assertResult[T](result: Array[Row], zero: T)(implicit ordering: 
Ordering[T]): Unit = {
+  assert(result.length == 1)
+  // using compare since 0.0 == -0.0 is true
+  assert(ordering.compare(result(0).getAs[T](0), zero) == 0)
+  assert(result(0).getLong(1) == 3)
+}
+
+spark.conf.set("spark.sql.codegen.wholeStage", "false")
+val doubles =
+  groupByCollect(Seq(0.0d, 0.0d, -0.0d).toDF(colName))
+val doublesBoxed =
+  groupByCollect(Seq(Double.box(0.0d), Double.box(0.0d), 
Double.box(-0.0d)).toDF(colName))
+val floats =
+  groupByCollect(Seq(0.0f, -0.0f, 0.0f).toDF(colName))
--- End diff --

why we have to turn off whole-stage-codegen?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r234844186
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
+ * have a public, 0-arg constructor.
+ *
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ */
+@InterfaceStability.Evolving
+// TODO: do not extend `DataSourceV2`, after we finish the API refactor 
completely.
+public interface TableProvider extends DataSourceV2 {
+
+  /**
+   * Return a {@link Table} instance to do read/write with user-specified 
options.
+   *
+   * @param options the user-specified options that can identify a table, 
e.g. file path, Kafka
+   *topic name, etc. It's an immutable case-insensitive 
string-to-string map.
+   */
+  Table getTable(DataSourceOptions options);
+
+  /**
+   * Return a {@link Table} instance to do read/write with user-specified 
schema and options.
+   *
+   * By default this method throws {@link UnsupportedOperationException}, 
implementations should
+   * override this method to handle user-specified schema.
+   *
+   * @param options the user-specified options that can identify a table, 
e.g. file path, Kafka
+   *topic name, etc. It's an immutable case-insensitive 
string-to-string map.
+   * @param schema the user-specified schema.
+   */
+  default Table getTable(DataSourceOptions options, StructType schema) {
--- End diff --

It's a different thing. Think about you are reading a parquet file, and you 
know exactly what its physical schema is, and you don't want Spark to waste a 
job to infer the schema. Then you can specify the schema when reading.

Next, Spark will analyze the query, and figure out what the required schema 
is. This step is automatic and driven by Spark.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23054: [SPARK-26085][SQL] Key attribute of non-struct type unde...

2018-11-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23054
  
sorry it conflicts, can you resolve it? I think it's ready to go


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23079: [SPARK-26107][SQL] Extend ReplaceNullWithFalseInPredicat...

2018-11-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23079
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23084: [SPARK-26117][CORE][SQL]use SparkOutOfMemoryError instea...

2018-11-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23084
  
good catch! thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23084: [SPARK-26117][CORE][SQL]use SparkOutOfMemoryError instea...

2018-11-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23084
  
add to whitelist


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23084: [SPARK-26117][CORE][SQL]use SparkOutOfMemoryError instea...

2018-11-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23084
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23084: [SPARK-26117][CORE][SQL]use SparkOutOfMemoryError...

2018-11-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23084#discussion_r234661975
  
--- Diff: 
core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java ---
@@ -741,7 +742,7 @@ public boolean append(Object kbase, long koff, int 
klen, Object vbase, long voff
 if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) 
{
   try {
 growAndRehash();
-  } catch (OutOfMemoryError oom) {
+  } catch (SparkOutOfMemoryError oom) {
--- End diff --

do you know what was the behavior before? Will we propagate the exception 
all the way up and kill the executor?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23086: [SPARK-25528][SQL] data source v2 API refactor (batch re...

2018-11-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23086
  
@rxin @rdblue @jose-torres @gatorsmile @gengliangwang @mccheah 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r234650248
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, 
WholeStageCodegenExec}
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.DataSourceV2
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory,
 ContinuousReadSupport, MicroBatchReadSupport}
+
+/**
+ * Physical plan node for scanning data from a data source.
+ */
+// TODO: micro-batch should be handled by `DataSourceV2ScanExec`, after we 
finish the API refactor
+// completely.
+case class DataSourceV2StreamingScanExec(
--- End diff --

I have to use two physical nodes, since batch and streaming have different 
APIs now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-19 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/23086

[SPARK-25528][SQL] data source v2 API refactor (batch read)

## What changes were proposed in this pull request?

This is the first step of the data source v2 API refactor 
[proposal](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing)

It adds the new API for batch read, without removing the old APIs, as they 
are still needed for streaming sources.

More concretely, it adds
1. `TableProvider`, works like an anonymous catalog
2. `Table`, represents a structured data set.
3. `ScanBuilder` and `Scan`, a logical represents of data source scan
4. `Batch`, a physical representation of data source batch scan.

## How was this patch tested?

existing tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark refactor-batch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23086.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23086


commit f06b5c58b1a890d425abd575fa6f4c40da7c4b3d
Author: Wenchen Fan 
Date:   2018-11-19T11:05:07Z

data source v2 API refactor (batch read)




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-11-19 Thread cloud-fan
Github user cloud-fan closed the pull request at:

https://github.com/apache/spark/pull/22547


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23045: [SPARK-26071][SQL] disallow map as map key

2018-11-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23045
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23079: [SPARK-26107][SQL] Extend ReplaceNullWithFalseInP...

2018-11-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23079#discussion_r234639734
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -767,6 +767,15 @@ object ReplaceNullWithFalse extends Rule[LogicalPlan] {
   replaceNullWithFalse(cond) -> value
 }
 cw.copy(branches = newBranches)
+  case af @ ArrayFilter(_, lf @ LambdaFunction(func, _, _)) =>
--- End diff --

ah i see. Sorry I missed it. Then it's safer to use a white-list here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23025: [SPARK-26024][SQL]: Update documentation for repartition...

2018-11-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23025
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23082: [SPARK-26112][SQL] Update since versions of new built-in...

2018-11-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23082
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23040: [SPARK-26068][Core]ChunkedByteBufferInputStream should h...

2018-11-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23040
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >