[GitHub] spark issue #23143: [SPARK-24762][SQL][Followup] Enable Option of Product en...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23143 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236514039 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.v2.reader.Scan; +import org.apache.spark.sql.sources.v2.reader.ScanBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * An interface representing a logical structured data set of a data source. For example, the + * implementation can be a directory on the file system, a topic of Kafka, or a table in the + * catalog, etc. + * + * This interface can mixin the following interfaces to support different operations: + * + * {@link SupportsBatchRead}: this table can be read in batch queries. + * + */ +@Evolving +public interface Table { + + /** + * Returns the schema of this table. + */ + StructType schema(); + + /** + * Returns a {@link ScanBuilder} which can be used to build a {@link Scan} later. Spark will call + * this method for each data scanning query. + * + * The builder can take some query specific information to do operators pushdown, and keep these + * information in the created {@link Scan}. + */ + ScanBuilder newScanBuilder(DataSourceOptions options); --- End diff -- I agree with it. Since `CaseInsensitiveStringMap` is not in the code base yet, shall we do it in the followup? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236513622 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; --- End diff -- why does this `Table` API need to be in catalyst? It's not even a plan. We can define a table plan interface in catalyst, and implement it in the SQL module with this `Table` API. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23127 there are still 2 golden file test failures because of the plan change... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23138: [SPARK-23356][SQL][TEST] add new test cases for a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23138#discussion_r236334056 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala --- @@ -196,4 +196,31 @@ class SetOperationSuite extends PlanTest { )) comparePlans(expectedPlan, rewrittenPlan) } + + test("SPARK-23356 union: expressions with number in project list are pushed down") { --- End diff -- `expressions with number` -> `expressions with literal` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23127#discussion_r236333530 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { override def limitNotReachedChecks: Seq[String] = Nil } +/** + * Leaf codegen node reading from a single RDD. + */ +trait InputRDDCodegen extends CodegenSupport { + + def inputRDD: RDD[InternalRow] + + // If the input is an RDD of InternalRow which are potentially not UnsafeRow, + // and there is no parent to consume it, it needs an UnsafeProjection. + protected val createUnsafeProjection: Boolean = (parent == null) + + override def inputRDDs(): Seq[RDD[InternalRow]] = { +inputRDD :: Nil + } + + override def doProduce(ctx: CodegenContext): String = { +// Inline mutable state since an InputRDDCodegen is used once in a task for WholeStageCodegen +val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];", + forceInline = true) +val row = ctx.freshName("row") + +val outputVars = if (createUnsafeProjection) { + // creating the vars will make the parent consume add an unsafe projection. + ctx.INPUT_ROW = row + ctx.currentVars = null + output.zipWithIndex.map { case (a, i) => +BoundReference(i, a.dataType, a.nullable).genCode(ctx) + } +} else { + null +} + +val numOutputRowsCode = if (metrics.contains("numOutputRows")) { --- End diff -- how about `updateNumOutputRowsMetrics` instead of `numOutputRowsCode`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23127#discussion_r236332786 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { override def limitNotReachedChecks: Seq[String] = Nil } +/** + * Leaf codegen node reading from a single RDD. + */ +trait InputRDDCodegen extends CodegenSupport { + + def inputRDD: RDD[InternalRow] + + // If the input is an RDD of InternalRow which are potentially not UnsafeRow, + // and there is no parent to consume it, it needs an UnsafeProjection. + protected val createUnsafeProjection: Boolean = (parent == null) + + override def inputRDDs(): Seq[RDD[InternalRow]] = { +inputRDD :: Nil + } + + override def doProduce(ctx: CodegenContext): String = { --- End diff -- the extra `if createUnsafeProjection` check? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23127#discussion_r236332511 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { override def limitNotReachedChecks: Seq[String] = Nil } +/** + * Leaf codegen node reading from a single RDD. + */ +trait InputRDDCodegen extends CodegenSupport { + + def inputRDD: RDD[InternalRow] + + // If the input is an RDD of InternalRow which are potentially not UnsafeRow, + // and there is no parent to consume it, it needs an UnsafeProjection. + protected val createUnsafeProjection: Boolean = (parent == null) + + override def inputRDDs(): Seq[RDD[InternalRow]] = { +inputRDD :: Nil + } + + override def doProduce(ctx: CodegenContext): String = { --- End diff -- can you highlight the difference between this one and the previous `RowDataSourceScanExec.doProduce`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r236282401 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType} + +/** + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes + * duplicated map keys w.r.t. the last wins policy. + */ +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable { + assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map") + + private lazy val keyToIndex = keyType match { +case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int] +case _ => + // for complex types, use interpreted ordering to be able to compare unsafe data with safe + // data, e.g. UnsafeRow vs GenericInternalRow. + mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType)) + } + + // TODO: specialize it + private lazy val keys = mutable.ArrayBuffer.empty[Any] + private lazy val values = mutable.ArrayBuffer.empty[Any] + + private lazy val keyGetter = InternalRow.getAccessor(keyType) + private lazy val valueGetter = InternalRow.getAccessor(valueType) + + def reset(): Unit = { +keyToIndex.clear() +keys.clear() +values.clear() + } + + def put(key: Any, value: Any): Unit = { +if (key == null) { + throw new RuntimeException("Cannot use null as map key.") +} + +val maybeExistingIdx = keyToIndex.get(key) +if (maybeExistingIdx.isDefined) { + // Overwrite the previous value, as the policy is last wins. + values(maybeExistingIdx.get) = value +} else { + keyToIndex.put(key, values.length) + keys.append(key) + values.append(value) +} + } + + // write a 2-field row, the first field is key and the second field is value. + def put(entry: InternalRow): Unit = { +if (entry.isNullAt(0)) { + throw new RuntimeException("Cannot use null as map key.") +} +put(keyGetter(entry, 0), valueGetter(entry, 1)) + } + + def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = { +if (keyArray.length != valueArray.length) { + throw new RuntimeException( +"The key array and value array of MapData must have the same length.") +} + +var i = 0 +while (i < keyArray.length) { + put(keyArray(i), valueArray(i)) + i += 1 +} + } + + def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = { +if (keyArray.numElements() != valueArray.numElements()) { + throw new RuntimeException( +"The key array and value array of MapData must have the same length.") +} + +var i = 0 +while (i < keyArray.numElements()) { + put(keyGetter(keyArray, i), valueGetter(valueArray, i)) + i += 1 +} + } + + def build(): ArrayBasedMapData = { +new ArrayBasedMapData(new GenericArrayData(keys.toArray), new GenericArrayData(values.toArray)) + } + + def from(keyArray: ArrayData, valueArray: ArrayData): ArrayBasedMapData = { +assert(keyToIndex.isEmpty, "'from' can only be called with a fresh GenericMapBuilder.") +putAll(keyArray, valueArray) --- End diff -- no we can't, as we still need to detect duplicated keys. --- -
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r236275822 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres override def nullable: Boolean = children.exists(_.nullable) + private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType) + override def eval(input: InternalRow): Any = { -val maps = children.map(_.eval(input)) +val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray --- End diff -- `toArray` is O(N) but we do it only once. If accessing by index is O(N), the total time complexity is O(N ^ 2). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r236274428 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -19,6 +19,8 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, users can create map values with map type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since Spark 3.0, it's not allowed to create map values with map type key with these built-in functions. Users can still read map values with map type key from data source or Java/Scala collections, though they are not very useful. + - In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these built-in functions will remove duplicated map keys with last wins policy. --- End diff -- They are related, but they are not the same. For example, we don't support map type as key, because we can't check equality of map type correctly. This is just a current implementation limitation, and we may relax it in the future. Duplicated map keys is a real problem and we will never allow it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23143: [SPARK-24762][SQL][Followup] Enable Option of Pro...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23143#discussion_r236267692 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -251,19 +251,15 @@ case class ExpressionEncoder[T]( */ def isSerializedAsStruct: Boolean = objSerializer.dataType.isInstanceOf[StructType] - /** - * Returns true if the type `T` is an `Option` type. - */ - def isOptionType: Boolean = classOf[Option[_]].isAssignableFrom(clsTag.runtimeClass) - /** * If the type `T` is serialized as a struct, when it is encoded to a Spark SQL row, fields in * the struct are naturally mapped to top-level columns in a row. In other words, the serialized * struct is flattened to row. But in case of the `T` is also an `Option` type, it can't be * flattened to top-level row, because in Spark SQL top-level row can't be null. This method * returns true if `T` is serialized as struct and is not `Option` type. */ - def isSerializedAsStructForTopLevel: Boolean = isSerializedAsStruct && !isOptionType + def isSerializedAsStructForTopLevel: Boolean = isSerializedAsStruct && --- End diff -- nit: since it cross lines, I'd prefer ``` def xxx = { xxx } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23127#discussion_r236151394 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -199,4 +199,6 @@ case class RDDScanExec( override def simpleString: String = { s"$nodeName${truncatedString(output, "[", ",", "]")}" } + + override def inputRDD: RDD[InternalRow] = rdd --- End diff -- The test failure is real. `RDDScanExec.rdd` may not be RDD of unsafe row. Maybe we should enforce that, `InputRDDCodegen.inputRDD` is `RDD[UnsafeRow]`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule Repla...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23139#discussion_r236150260 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, ArrayFilter, CaseWhen, Expression, If} +import org.apache.spark.sql.catalyst.expressions.{LambdaFunction, Literal, MapFilter, Or} +import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.BooleanType + + +/** + * A rule that replaces `Literal(null, BooleanType)` with `FalseLiteral`, if possible, in the search + * condition of the WHERE/HAVING/ON(JOIN) clauses, which contain an implicit Boolean operator + * "(search condition) = TRUE". The replacement is only valid when `Literal(null, BooleanType)` is + * semantically equivalent to `FalseLiteral` when evaluating the whole search condition. + * + * Please note that FALSE and NULL are not exchangeable in most cases, when the search condition + * contains NOT and NULL-tolerant expressions. Thus, the rule is very conservative and applicable + * in very limited cases. + * + * For example, `Filter(Literal(null, BooleanType))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * Moreover, this rule also transforms predicates in all [[If]] expressions as well as branch + * conditions in all [[CaseWhen]] expressions, even if they are not part of the search conditions. + * + * For example, `Project(If(And(cond, Literal(null)), Literal(1), Literal(2)))` can be simplified + * into `Project(Literal(2))`. + */ +object ReplaceNullWithFalseInPredicate extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case cw @ CaseWhen(branches, _) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +cw.copy(branches = newBranches) + case af @ ArrayFilter(_, lf @ LambdaFunction(func, _, _)) => +val newLambda = lf.copy(function = replaceNullWithFalse(func)) +af.copy(function = newLambda) + case ae @ ArrayExists(_, lf @ LambdaFunction(func, _, _)) => +val newLambda = lf.copy(function = replaceNullWithFalse(func)) +ae.copy(function = newLambda) + case mf @ MapFilter(_, lf @ LambdaFunction(func, _, _)) => +val newLambda = lf.copy(function = replaceNullWithFalse(func)) +mf.copy(function = newLambda) +} + } + + /** + * Recursively traverse the Boolean-type expression to replace + * `Literal(null, BooleanType)` with `FalseLiteral`, if possible. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or + * `Literal(null, BooleanType)`. + */ + private def replaceNullWithFalse(e: Expression): Expression = { +if (e.dataType != BooleanType) { --- End diff -- We don't
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r236149203 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -388,7 +388,7 @@ case class FileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => -p.files.map { f => +p.files.filter(_.getLen > 0).map { f => --- End diff -- do you mean changing `filter...map...` to `flatMap`? I don't have a strong preference about it. The updated test cases and the new test case are for this change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23135: [SPARK-26168][SQL] Update the code comments in Expressio...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23135 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter's readingIterator fie...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23083 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23137: [SPARK-26169] Create DataFrameSetOperationsSuite
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23137 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23141: [SPARK-26021][SQL][followup] add test for special...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23141#discussion_r236143942 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -17,14 +17,16 @@ displayTitle: Spark SQL Upgrading Guide - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. - - In Spark version 2.4 and earlier, the `from_json` function produces `null`s for JSON strings and JSON datasource skips the same independetly of its mode if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`. + - In Spark version 2.4 and earlier, the `from_json` function produces `null`s for JSON strings and JSON datasource skips the same independetly of its mode if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`. - The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set. - In Spark version 2.4 and earlier, users can create map values with map type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since Spark 3.0, it's not allowed to create map values with map type key with these built-in functions. Users can still read map values with map type key from data source or Java/Scala collections, though they are not very useful. - + - In Spark version 2.4 and earlier, `Dataset.groupByKey` results to a grouped dataset with key attribute wrongly named as "value", if the key is non-struct type, e.g. int, string, array, etc. This is counterintuitive and makes the schema of aggregation queries weird. For example, the schema of `ds.groupByKey(...).count()` is `(value, count)`. Since Spark 3.0, we name the grouping attribute to "key". The old behaviour is preserved under a newly added configuration `spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue` with a default value of `false`. + - In Spark version 2.4 and earlier, float/double -0.0 is semantically equal to 0.0, but users can still distinguish them via `Dataset.show`, `Dataset.collect` etc. Since Spark 3.0, float/double -0.0 is replaced by 0.0 internally, and users can't distinguish them any more. --- End diff -- I checked presto and postgres, the behaviors are same. Hive distinguishes -0.0 and 0.0, but it has the group by bug. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23141: [SPARK-26021][SQL][followup] add test for special floati...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23141 cc @adoron @kiszk @viirya --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23141: [SPARK-26021][SQL][followup] add test for special...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/23141 [SPARK-26021][SQL][followup] add test for special floating point values ## What changes were proposed in this pull request? a followup of https://github.com/apache/spark/pull/23124 . Add a test to show the minor behavior change introduced by #23124 , and add migration guide. ## How was this patch tested? a new test You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark follow Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23141.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23141 commit 8a9103c47931eb61cb329ece046d5efc50e855c2 Author: Wenchen Fan Date: 2018-11-26T06:11:09Z add test for special floating point values --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23104: [SPARK-26138][SQL] LimitPushDown cross join requi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23104#discussion_r236118983 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -459,6 +459,7 @@ object LimitPushDown extends Rule[LogicalPlan] { val newJoin = joinType match { case RightOuter => join.copy(right = maybePushLocalLimit(exp, right)) case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left)) +case Cross => join.copy(left = maybePushLocalLimit(exp, left), right = maybePushLocalLimit(exp, right)) --- End diff -- inner join without condition is literally cross join. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule Repla...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23139#discussion_r236118914 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, ArrayFilter, CaseWhen, Expression, If} +import org.apache.spark.sql.catalyst.expressions.{LambdaFunction, Literal, MapFilter, Or} +import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.BooleanType + + +/** + * A rule that replaces `Literal(null, BooleanType)` with `FalseLiteral`, if possible, in the search + * condition of the WHERE/HAVING/ON(JOIN) clauses, which contain an implicit Boolean operator + * "(search condition) = TRUE". The replacement is only valid when `Literal(null, BooleanType)` is + * semantically equivalent to `FalseLiteral` when evaluating the whole search condition. + * + * Please note that FALSE and NULL are not exchangeable in most cases, when the search condition + * contains NOT and NULL-tolerant expressions. Thus, the rule is very conservative and applicable + * in very limited cases. + * + * For example, `Filter(Literal(null, BooleanType))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * Moreover, this rule also transforms predicates in all [[If]] expressions as well as branch + * conditions in all [[CaseWhen]] expressions, even if they are not part of the search conditions. + * + * For example, `Project(If(And(cond, Literal(null)), Literal(1), Literal(2)))` can be simplified + * into `Project(Literal(2))`. + */ +object ReplaceNullWithFalseInPredicate extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case cw @ CaseWhen(branches, _) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +cw.copy(branches = newBranches) + case af @ ArrayFilter(_, lf @ LambdaFunction(func, _, _)) => +val newLambda = lf.copy(function = replaceNullWithFalse(func)) +af.copy(function = newLambda) + case ae @ ArrayExists(_, lf @ LambdaFunction(func, _, _)) => +val newLambda = lf.copy(function = replaceNullWithFalse(func)) +ae.copy(function = newLambda) + case mf @ MapFilter(_, lf @ LambdaFunction(func, _, _)) => +val newLambda = lf.copy(function = replaceNullWithFalse(func)) +mf.copy(function = newLambda) +} + } + + /** + * Recursively traverse the Boolean-type expression to replace + * `Literal(null, BooleanType)` with `FalseLiteral`, if possible. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or + * `Literal(null, BooleanType)`. + */ + private def replaceNullWithFalse(e: Expression): Expression = { +if (e.dataType != BooleanType) { --- End diff -- do we nee
[GitHub] spark issue #23135: [SPARK-26168][SQL] Update the code comments in Expressio...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23135 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23127 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23127 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23127#discussion_r236118569 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -350,6 +350,15 @@ trait CodegenSupport extends SparkPlan { */ def needStopCheck: Boolean = parent.needStopCheck + /** + * Helper default should stop check code. + */ + def shouldStopCheckCode: String = if (needStopCheck) { --- End diff -- we can use in in more places. This can be done in folllowup. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23104: [SPARK-26138][SQL] LimitPushDown cross join requi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23104#discussion_r236114831 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -459,6 +459,7 @@ object LimitPushDown extends Rule[LogicalPlan] { val newJoin = joinType match { case RightOuter => join.copy(right = maybePushLocalLimit(exp, right)) case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left)) +case Cross => join.copy(left = maybePushLocalLimit(exp, left), right = maybePushLocalLimit(exp, right)) --- End diff -- maybe we can match `InnerLike` when condition is empty. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23104: [SPARK-26138][SQL] LimitPushDown cross join requi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23104#discussion_r236114751 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -459,6 +459,7 @@ object LimitPushDown extends Rule[LogicalPlan] { val newJoin = joinType match { case RightOuter => join.copy(right = maybePushLocalLimit(exp, right)) case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left)) +case Cross => join.copy(left = maybePushLocalLimit(exp, left), right = maybePushLocalLimit(exp, right)) --- End diff -- how about inner join without condition? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r236112390 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -646,34 +633,35 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres val mapMerge = s""" -|${ev.isNull} = $hasNullName; -|if (!${ev.isNull}) { -| $arrayDataClass[] $keyArgsName = new $arrayDataClass[${mapCodes.size}]; -| $arrayDataClass[] $valArgsName = new $arrayDataClass[${mapCodes.size}]; -| long $numElementsName = 0; -| for (int $idxName = 0; $idxName < $argsName.length; $idxName++) { -|$keyArgsName[$idxName] = $argsName[$idxName].keyArray(); -|$valArgsName[$idxName] = $argsName[$idxName].valueArray(); -|$numElementsName += $argsName[$idxName].numElements(); -| } -| if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) { -|throw new RuntimeException("Unsuccessful attempt to concat maps with " + -| $numElementsName + " elements due to exceeding the map size limit " + -| "${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}."); -| } -| $arrayDataClass $finKeysName = $keyConcat($keyArgsName, -|(int) $numElementsName); -| $arrayDataClass $finValsName = $valueConcat($valArgsName, -|(int) $numElementsName); -| ${ev.value} = new $arrayBasedMapDataClass($finKeysName, $finValsName); +|ArrayData[] $keyArgsName = new ArrayData[${mapCodes.size}]; +|ArrayData[] $valArgsName = new ArrayData[${mapCodes.size}]; +|long $numElementsName = 0; +|for (int $idxName = 0; $idxName < $argsName.length; $idxName++) { +| $keyArgsName[$idxName] = $argsName[$idxName].keyArray(); +| $valArgsName[$idxName] = $argsName[$idxName].valueArray(); +| $numElementsName += $argsName[$idxName].numElements(); |} +|if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) { --- End diff -- yup. I actually did what you proposed at first, and then realized it's different from before and may introduce perf regression. We can investigate it in a followup. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r236112256 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres override def nullable: Boolean = children.exists(_.nullable) + private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType) + override def eval(input: InternalRow): Any = { -val maps = children.map(_.eval(input)) +val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray --- End diff -- BTW, if it's not true anymore with scala 2.12, we should update them together with a bechmark, instead of only updating this single one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r236112177 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres override def nullable: Boolean = children.exists(_.nullable) + private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType) + override def eval(input: InternalRow): Any = { -val maps = children.map(_.eval(input)) +val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray --- End diff -- in scala, while loop is faster than `foreach`. If you look at `Expression.eval` implementations, we use while loop a lot even `foreach` can produce simpler code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21732: [SPARK-24762][SQL] Enable Option of Product encod...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21732#discussion_r236111929 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -253,10 +247,24 @@ case class ExpressionEncoder[T]( }) /** - * Returns true if the type `T` is serialized as a struct. + * Returns true if the type `T` is serialized as a struct by `objSerializer`. */ def isSerializedAsStruct: Boolean = objSerializer.dataType.isInstanceOf[StructType] + /** + * Returns true if the type `T` is an `Option` type. + */ + def isOptionType: Boolean = classOf[Option[_]].isAssignableFrom(clsTag.runtimeClass) + + /** + * If the type `T` is serialized as a struct, when it is encoded to a Spark SQL row, fields in + * the struct are naturally mapped to top-level columns in a row. In other words, the serialized + * struct is flattened to row. But in case of the `T` is also an `Option` type, it can't be + * flattened to top-level row, because in Spark SQL top-level row can't be null. This method + * returns true if `T` is serialized as struct and is not `Option` type. + */ + def isSerializedAsStructForTopLevel: Boolean = isSerializedAsStruct && !isOptionType --- End diff -- can you send a followup PR to inline `isOptionType` if it's only used here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21732: [SPARK-24762][SQL] Enable Option of Product encoders
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21732 thanks, merging to master, great work! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter's readingIterator fie...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23083 LGTM, thanks for your great work! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter's readingIterator fie...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23083 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23130 The code change LGTM. There is a mistake in PR description: we updated `FileSourceScanExec` not `DataSourceScanExec`. Let's also mention that this fixed a behavior change introduced by https://github.com/apache/spark/pull/22938 mistakenly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22512: [SPARK-25498][SQL] InterpretedMutableProjection should h...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22512 @maropu are you still working on it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23137: [SPARK-26169] Create DataFrameSetOperationsSuite
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23137 LGTM, waiting for resolving the conflicts. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236108610 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -82,6 +82,14 @@ object SQLMetrics { private val baseForAvgMetric: Int = 10 + val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched" + val LOCAL_BLOCKS_FETCHED = "localBlocksFetched" + val REMOTE_BYTES_READ = "remoteBytesRead" + val REMOTE_BYTES_READ_TO_DISK = "remoteBytesReadToDisk" + val LOCAL_BYTES_READ = "localBytesRead" + val FETCH_WAIT_TIME = "fetchWaitTime" + val RECORDS_READ = "recordsRead" --- End diff -- Is there an easy way to sync this list with `ShuffleReadMetrics` instead of doing it manually? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23131: [SPARK-25908][SQL][FOLLOW-UP] Add back unionAll
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23131 shall we say `union` is an alias of `unionAll` instead of `unionAll` is an alias of `Union`? According to the SQL spec, `unionAll` is implemented correctly that it keeps duplicated rows, while `union` does not follow SQL spec, as it's too widely used and it's too late to change behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23135: [SPARK-26168][SQL] Update the code comments in Ex...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23135#discussion_r236106495 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -43,9 +43,24 @@ import org.apache.spark.sql.types._ * There are a few important traits: * * - [[Nondeterministic]]: an expression that is not deterministic. + * - [[Stateful]]: an expression that contains mutable state. For example, MonotonicallyIncreasingID + * and Rand. A stateful expression is always non-deterministic. * - [[Unevaluable]]: an expression that is not supposed to be evaluated. * - [[CodegenFallback]]: an expression that does not have code gen implemented and falls back to *interpreted mode. + * - [[NullIntolerant]]: an expression that is null intolerant (i.e. any null input will result in + * null output). + * - [[NonSQLExpression]]: a common base trait for the expressions that doesn't have SQL + * expressions like representation. For example, `ScalaUDF`, `ScalaUDAF`, + * and object `MapObjects` and `Invoke`. + * - [[UserDefinedExpression]]: a common base trait for user-defined functions, including + * UDF/UDAF/UDTF. + * - [[HigherOrderFunction]]: a common base trait for higher order functions that take one or more + *(lambda) functions and applies these to some objects. The function + *produces a number of variables which can be consumed by some lambda + *function. + * - [[NamedExpression]]: An [[Expression]] that is named. + * - [[TimeZoneAwareExpression]]: A common base trait for time zone aware expressions. --- End diff -- shall we also mention `SubqueryExpression`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23084: [SPARK-26117][CORE][SQL]use SparkOutOfMemoryError instea...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23084 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23084: [SPARK-26117][CORE][SQL]use SparkOutOfMemoryError instea...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23084 I think it's safer to only catch the spark-thrown OOM, not the system OOM, so LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r235929748 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType} + +/** + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes + * duplicated map keys w.r.t. the last wins policy. + */ +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable { + assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map") + + private lazy val keyToIndex = keyType match { +case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int] +case _ => + // for complex types, use interpreted ordering to be able to compare unsafe data with safe + // data, e.g. UnsafeRow vs GenericInternalRow. + mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType)) --- End diff -- I think we should fail it at analyzer phase, and other map-producing functions should do it as well. Can you create a JIRA for it? thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r235929210 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType} + +/** + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes + * duplicated map keys w.r.t. the last wins policy. + */ +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable { + assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map") + + private lazy val keyToIndex = keyType match { +case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int] +case _ => + // for complex types, use interpreted ordering to be able to compare unsafe data with safe + // data, e.g. UnsafeRow vs GenericInternalRow. + mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType)) + } + + // TODO: specialize it + private lazy val keys = mutable.ArrayBuffer.empty[Any] + private lazy val values = mutable.ArrayBuffer.empty[Any] + + private lazy val keyGetter = InternalRow.getAccessor(keyType) + private lazy val valueGetter = InternalRow.getAccessor(valueType) + + def reset(): Unit = { +keyToIndex.clear() +keys.clear() +values.clear() + } + + def put(key: Any, value: Any): Unit = { +if (key == null) { + throw new RuntimeException("Cannot use null as map key.") +} + +val maybeExistingIdx = keyToIndex.get(key) +if (maybeExistingIdx.isDefined) { + // Overwrite the previous value, as the policy is last wins. + values(maybeExistingIdx.get) = value +} else { + keyToIndex.put(key, values.length) + keys.append(key) + values.append(value) +} + } + + // write a 2-field row, the first field is key and the second field is value. + def put(entry: InternalRow): Unit = { +if (entry.isNullAt(0)) { + throw new RuntimeException("Cannot use null as map key.") +} +put(keyGetter(entry, 0), valueGetter(entry, 1)) + } + + def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = { --- End diff -- ah good catch! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r235928954 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType} + +/** + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes + * duplicated map keys w.r.t. the last wins policy. + */ +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable { + assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map") + + private lazy val keyToIndex = keyType match { +case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int] +case _ => + // for complex types, use interpreted ordering to be able to compare unsafe data with safe + // data, e.g. UnsafeRow vs GenericInternalRow. + mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType)) + } + + // TODO: specialize it + private lazy val keys = mutable.ArrayBuffer.empty[Any] + private lazy val values = mutable.ArrayBuffer.empty[Any] + + private lazy val keyGetter = InternalRow.getAccessor(keyType) + private lazy val valueGetter = InternalRow.getAccessor(valueType) + + def reset(): Unit = { +keyToIndex.clear() +keys.clear() +values.clear() + } + + def put(key: Any, value: Any): Unit = { +if (key == null) { + throw new RuntimeException("Cannot use null as map key.") +} + +val maybeExistingIdx = keyToIndex.get(key) +if (maybeExistingIdx.isDefined) { + // Overwrite the previous value, as the policy is last wins. + values(maybeExistingIdx.get) = value +} else { + keyToIndex.put(key, values.length) + keys.append(key) + values.append(value) +} + } + + // write a 2-field row, the first field is key and the second field is value. + def put(entry: InternalRow): Unit = { +if (entry.isNullAt(0)) { --- End diff -- There are 2 put methods have this null check and other put methods all go through them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r235928588 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -646,34 +633,35 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres val mapMerge = s""" -|${ev.isNull} = $hasNullName; -|if (!${ev.isNull}) { -| $arrayDataClass[] $keyArgsName = new $arrayDataClass[${mapCodes.size}]; -| $arrayDataClass[] $valArgsName = new $arrayDataClass[${mapCodes.size}]; -| long $numElementsName = 0; -| for (int $idxName = 0; $idxName < $argsName.length; $idxName++) { -|$keyArgsName[$idxName] = $argsName[$idxName].keyArray(); -|$valArgsName[$idxName] = $argsName[$idxName].valueArray(); -|$numElementsName += $argsName[$idxName].numElements(); -| } -| if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) { -|throw new RuntimeException("Unsuccessful attempt to concat maps with " + -| $numElementsName + " elements due to exceeding the map size limit " + -| "${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}."); -| } -| $arrayDataClass $finKeysName = $keyConcat($keyArgsName, -|(int) $numElementsName); -| $arrayDataClass $finValsName = $valueConcat($valArgsName, -|(int) $numElementsName); -| ${ev.value} = new $arrayBasedMapDataClass($finKeysName, $finValsName); +|ArrayData[] $keyArgsName = new ArrayData[${mapCodes.size}]; +|ArrayData[] $valArgsName = new ArrayData[${mapCodes.size}]; +|long $numElementsName = 0; +|for (int $idxName = 0; $idxName < $argsName.length; $idxName++) { +| $keyArgsName[$idxName] = $argsName[$idxName].keyArray(); +| $valArgsName[$idxName] = $argsName[$idxName].valueArray(); +| $numElementsName += $argsName[$idxName].numElements(); |} +|if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) { --- End diff -- This check is done before the `putAll`, so that it can fail fast. I think it's fine to ignore duplicated keys here, to make it a more conservative. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r235927895 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres override def nullable: Boolean = children.exists(_.nullable) + private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType) + override def eval(input: InternalRow): Any = { -val maps = children.map(_.eval(input)) +val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray --- End diff -- I need to access it by index below, turn it to array so that the access is guaranteed to be O(1). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter Leak
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23083 Looking at the code, we are trying to fix 2 memory leaks: the task completion listener in `ShuffleBlockFetcherIterator`, and the `CompletionIterator`. If that's case, can you say that in the PR description? For the task completion listener, I think it's an overkill to introduce a new API, do we know exactly where we leak the memory? and can we null it out when the `ShuffleBlockFetcherIterator` reaches to its end? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r235849825 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -751,171 +739,46 @@ case class MapFromEntries(child: Expression) extends UnaryExpression { s"${child.dataType.catalogString} type. $prettyName accepts only arrays of pair structs.") } + private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType) + override protected def nullSafeEval(input: Any): Any = { -val arrayData = input.asInstanceOf[ArrayData] -val numEntries = arrayData.numElements() +val entries = input.asInstanceOf[ArrayData] +val numEntries = entries.numElements() var i = 0 -if(nullEntries) { +if (nullEntries) { while (i < numEntries) { -if (arrayData.isNullAt(i)) return null +if (entries.isNullAt(i)) return null i += 1 } } -val keyArray = new Array[AnyRef](numEntries) -val valueArray = new Array[AnyRef](numEntries) + +mapBuilder.reset() i = 0 while (i < numEntries) { - val entry = arrayData.getStruct(i, 2) - val key = entry.get(0, dataType.keyType) - if (key == null) { -throw new RuntimeException("The first field from a struct (key) can't be null.") - } - keyArray.update(i, key) - val value = entry.get(1, dataType.valueType) - valueArray.update(i, value) + mapBuilder.put(entries.getStruct(i, 2)) i += 1 } -ArrayBasedMapData(keyArray, valueArray) +mapBuilder.build() } override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { nullSafeCodeGen(ctx, ev, c => { val numEntries = ctx.freshName("numEntries") - val isKeyPrimitive = CodeGenerator.isPrimitiveType(dataType.keyType) - val isValuePrimitive = CodeGenerator.isPrimitiveType(dataType.valueType) - val code = if (isKeyPrimitive && isValuePrimitive) { -genCodeForPrimitiveElements(ctx, c, ev.value, numEntries) --- End diff -- since we need to check duplicated map keys, it's not possible to apply this trick anymore, as we need to overwrite values if the key appears before. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r235849697 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala --- @@ -125,22 +125,36 @@ object InternalRow { * actually takes a `SpecializedGetters` input because it can be generalized to other classes * that implements `SpecializedGetters` (e.g., `ArrayData`) too. */ - def getAccessor(dataType: DataType): (SpecializedGetters, Int) => Any = dataType match { -case BooleanType => (input, ordinal) => input.getBoolean(ordinal) -case ByteType => (input, ordinal) => input.getByte(ordinal) -case ShortType => (input, ordinal) => input.getShort(ordinal) -case IntegerType | DateType => (input, ordinal) => input.getInt(ordinal) -case LongType | TimestampType => (input, ordinal) => input.getLong(ordinal) -case FloatType => (input, ordinal) => input.getFloat(ordinal) -case DoubleType => (input, ordinal) => input.getDouble(ordinal) -case StringType => (input, ordinal) => input.getUTF8String(ordinal) -case BinaryType => (input, ordinal) => input.getBinary(ordinal) -case CalendarIntervalType => (input, ordinal) => input.getInterval(ordinal) -case t: DecimalType => (input, ordinal) => input.getDecimal(ordinal, t.precision, t.scale) -case t: StructType => (input, ordinal) => input.getStruct(ordinal, t.size) -case _: ArrayType => (input, ordinal) => input.getArray(ordinal) -case _: MapType => (input, ordinal) => input.getMap(ordinal) -case u: UserDefinedType[_] => getAccessor(u.sqlType) -case _ => (input, ordinal) => input.get(ordinal, dataType) + def getAccessor(dt: DataType, nullable: Boolean = true): (SpecializedGetters, Int) => Any = { --- End diff -- I can move it to a new PR if others think it's necessary. It's a little dangerous to ask the caller side to take care of null values. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23124 cc @dongjoon-hyun @gatorsmile @viirya @kiszk @mgaido91 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/23124 [SPARK-25829][SQL] remove duplicated map keys with last wins policy ## What changes were proposed in this pull request? Currently duplicated map keys are not handled consistently. For example, map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. This PR proposes to remove duplicated map keys with last wins policy, to follow Java/Scala and Presto. It only applies to built-in functions, as users can create map with duplicated map keys via private APIs anyway. For other places: 1. data source v1 doesn't have this problem, as users need to provide a java/scala map, which can't have duplicated keys. 2. data source v2 may have this problem. I've added a note to `ArrayBasedMapData` to ask the caller to take care of duplicated keys. In the future we should enforce it in the stable data APIs for data source v2. 3. UDF doesn't have this problem, as users need to provide a java/scala map. Same as data source v1. 4. file format. I checked all of them and only parquet does not enforce it. For backward compatibility reasons I change nothing but leave a note saying that the behavior will be undefined if users write map with duplicated keys to parquet files. Maybe we can add a config and fail by default if parquet files have map with duplicated keys. This can be done in followup. ## How was this patch tested? updated tests and new tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark map Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23124.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23124 commit cbcd5d7a937f8120ef8527f1f26150ed93f1de0a Author: Wenchen Fan Date: 2018-11-15T02:49:22Z remove duplicated map keys with last wins policy --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21732: [SPARK-24762][SQL] Enable Option of Product encod...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21732#discussion_r235840903 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -257,6 +251,11 @@ case class ExpressionEncoder[T]( */ def isSerializedAsStruct: Boolean = objSerializer.dataType.isInstanceOf[StructType] + /** + * Returns true if the type `T` is `Option`. + */ + def isOptionType: Boolean = classOf[Option[_]].isAssignableFrom(clsTag.runtimeClass) --- End diff -- sorry typo: `isSerializedAsStruct && !isOption` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21732: [SPARK-24762][SQL] Enable Option of Product encod...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21732#discussion_r235840884 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -257,6 +251,11 @@ case class ExpressionEncoder[T]( */ def isSerializedAsStruct: Boolean = objSerializer.dataType.isInstanceOf[StructType] + /** + * Returns true if the type `T` is `Option`. + */ + def isOptionType: Boolean = classOf[Option[_]].isAssignableFrom(clsTag.runtimeClass) --- End diff -- what do you mean? What I asked is a code style change, to make the code more maintainable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23105: [SPARK-26140] Enable custom metrics implementatio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23105#discussion_r235834225 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala --- @@ -48,7 +48,8 @@ private[spark] trait ShuffleManager { handle: ShuffleHandle, startPartition: Int, endPartition: Int, - context: TaskContext): ShuffleReader[K, C] + context: TaskContext, + metrics: ShuffleMetricsReporter): ShuffleReader[K, C] --- End diff -- IIUC, we should pass a read metrics reporter here, as this method is `getReader` which is called by the reducers to read shuffle files. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23105: [SPARK-26140] Enable custom metrics implementatio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23105#discussion_r235834136 --- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +/** + * An interface for reporting shuffle read metrics, for each shuffle. This interface assumes + * all the methods are called on a single-threaded, i.e. concrete implementations would not need + * to synchronize. + * + * All methods have additional Spark visibility modifier to allow public, concrete implementations + * that still have these methods marked as private[spark]. + */ +private[spark] trait ShuffleReadMetricsReporter { --- End diff -- how do we plan to use this interface later on? It's not used in this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23105: [SPARK-26140] Enable custom metrics implementatio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23105#discussion_r235834088 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleMetricsReporter.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +/** + * An interface for reporting shuffle information, for each shuffle. This interface assumes --- End diff -- `for each shuffle` -> `for each reducer of a shuffle`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23043: [SPARK-26021][SQL] replace minus zero with zero in Platf...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23043 thanks, merging to master/2.4! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23054: [SPARK-26085][SQL] Key attribute of non-struct type unde...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23054 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23052: [SPARK-26081][SQL] Prevent empty files for empty partiti...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23052 First of all, sometimes we do need to write "empty" files, so that we can infer schema of a parquet directory. Empty parquet file is not really empty, as it has header/footer. https://github.com/apache/spark/pull/20525 guarantees we always write out at least one empty file. One important thing is, when we write out an empty dataframe to file, and read it back, it should still be an empty dataframe. I'd suggest we skip empty file in text-based data sources, and later on send a followup PR to not write empty text files, as a perf improvement. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r235584943 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1892,7 +1898,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .text(path) val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path) - assert(jsonDF.count() === corruptRecordCount) + assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file --- End diff -- shall we skip empty files for all the file-based data sources? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22938: [SPARK-25935][SQL] Prevent null rows from JSON parser
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22938 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23043: [SPARK-26021][SQL] replace minus zero with zero in Platf...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23043 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22938: [SPARK-25935][SQL] Prevent null rows from JSON parser
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22938 LGTM except the migration guide. JSON data source can't produce null rows, but skip it even with permisive mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23054: [SPARK-26085][SQL] Key attribute of non-struct type unde...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23054 hmmm it conflicts again... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23043: [SPARK-26021][SQL] replace minus zero with zero in Platf...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23043 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21732: [SPARK-24762][SQL] Enable Option of Product encoders
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21732 last comment, LGTM otherwise --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21732: [SPARK-24762][SQL] Enable Option of Product encod...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21732#discussion_r235273262 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -257,6 +251,11 @@ case class ExpressionEncoder[T]( */ def isSerializedAsStruct: Boolean = objSerializer.dataType.isInstanceOf[StructType] + /** + * Returns true if the type `T` is `Option`. + */ + def isOptionType: Boolean = classOf[Option[_]].isAssignableFrom(clsTag.runtimeClass) --- End diff -- regarding maintainability, I think it's better to add a ``` // maybe there is a better naming def isSerializedAsStructForTopLevel: Boolean = { isSerializedAsStruct && isOption } ``` The benefit is, when people call `isSerializedAsStruct`, they will see this similar method and think carefully which one he should use. Otherwise, people may miss to check `isOptionType` easily. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22149: [SPARK-25158][SQL]Executor accidentally exit beca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22149#discussion_r235266375 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala --- @@ -308,6 +308,12 @@ private class ScriptTransformationWriterThread( } threwException = false } catch { + // TaskKilledException should not be thrown again, otherwise it will be captured by + // SparkUncaughtExceptionHandler, then Executor will exit because of TaskKilledException. + case e: TaskKilledException => --- End diff -- I see. So `ScriptTransformationExec` is special because 1. it starts a new thread 2. the new thread is very likely to throw `TaskKilledException`, when speculation is on. I think we should not kill the executor just because `ScriptTransformationWriterThread` fails. We should log the error, instead of throwing it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23054: [SPARK-26085][SQL] Key attribute of non-struct type unde...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23054 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22149: [SPARK-25158][SQL]Executor accidentally exit beca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22149#discussion_r235253094 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala --- @@ -308,6 +308,12 @@ private class ScriptTransformationWriterThread( } threwException = false } catch { + // TaskKilledException should not be thrown again, otherwise it will be captured by + // SparkUncaughtExceptionHandler, then Executor will exit because of TaskKilledException. + case e: TaskKilledException => --- End diff -- > Actually ScriptTransformationExec like a streaming pipe, but other operators basically no such characteristics. can you elaborate on it? How `TaskKilledException` is handled in other operators? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23054: [SPARK-26085][SQL] Key attribute of non-struct ty...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23054#discussion_r235238609 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1594,6 +1594,15 @@ object SQLConf { "WHERE, which does not follow SQL standard.") .booleanConf .createWithDefault(false) + + val ALIAS_NON_STRUCT_GROUPING_KEY_AS_VALUE = +buildConf("spark.sql.legacy.dataset.aliasNonStructGroupingKeyAsValue") + .internal() + .doc("When set to true, the key attribute resulted from running `Dataset.groupByKey` " + +"for non-struct key type, will be named as `value`, following the behavior of Spark " + --- End diff -- so it's "named" as value, not "alias" to value. I think it's more precise to call this config `spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22149: [SPARK-25158][SQL]Executor accidentally exit beca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22149#discussion_r235238304 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala --- @@ -308,6 +308,12 @@ private class ScriptTransformationWriterThread( } threwException = false } catch { + // TaskKilledException should not be thrown again, otherwise it will be captured by + // SparkUncaughtExceptionHandler, then Executor will exit because of TaskKilledException. + case e: TaskKilledException => --- End diff -- which line of code may throw this exception? and how is this exception handled in other operators? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23043: [SPARK-26021][SQL] replace minus zero with zero i...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23043#discussion_r235237427 --- Diff: common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java --- @@ -157,4 +159,15 @@ public void heapMemoryReuse() { Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7); Assert.assertEquals(obj3, onheap4.getBaseObject()); } + + @Test + // SPARK-26021 + public void writeMinusZeroIsReplacedWithZero() { +byte[] doubleBytes = new byte[Double.BYTES]; +byte[] floatBytes = new byte[Float.BYTES]; +Platform.putDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET, -0.0d); +Platform.putFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET, -0.0f); +Assert.assertEquals(0, Double.compare(0.0d, ByteBuffer.wrap(doubleBytes).getDouble())); +Assert.assertEquals(0, Float.compare(0.0f, ByteBuffer.wrap(floatBytes).getFloat())); --- End diff -- and would be better to directly check that, the binary of `0.0` and `-0.0` are same. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23043: [SPARK-26021][SQL] replace minus zero with zero in Platf...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23043 LGTM except one tiny comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23043: [SPARK-26021][SQL] replace minus zero with zero i...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23043#discussion_r235237048 --- Diff: common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java --- @@ -157,4 +159,15 @@ public void heapMemoryReuse() { Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7); Assert.assertEquals(obj3, onheap4.getBaseObject()); } + + @Test + // SPARK-26021 + public void writeMinusZeroIsReplacedWithZero() { +byte[] doubleBytes = new byte[Double.BYTES]; +byte[] floatBytes = new byte[Float.BYTES]; +Platform.putDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET, -0.0d); +Platform.putFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET, -0.0f); +Assert.assertEquals(0, Double.compare(0.0d, ByteBuffer.wrap(doubleBytes).getDouble())); +Assert.assertEquals(0, Float.compare(0.0f, ByteBuffer.wrap(floatBytes).getFloat())); --- End diff -- can we use `Platform.getFloat` to read the value back? to match how we write it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23043: [SPARK-26021][SQL] replace minus zero with zero in Platf...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23043 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23084: [SPARK-26117][CORE][SQL]use SparkOutOfMemoryError instea...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23084 > for example, HashAggregateExec will carry out spill the map and fallback to sort-based Do you mean this patch change nothing for this case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r234855810 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * A physical representation of a data source scan for batch queries. This interface is used to + * provide physical information, like how many partitions the scanned data has, and how to read + * records from the partitions. + */ +@InterfaceStability.Evolving +public interface Batch { --- End diff -- I don't have a strong preference. I feel it's a little more clear to distinguish between scan and batch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23069: [SPARK-26026][BUILD] Published Scaladoc jars missing fro...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23069 Is it a requirement to put annotation class top level? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23043: [SPARK-26021][SQL] replace minus zero with zero i...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23043#discussion_r234847102 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala --- @@ -723,4 +723,32 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { "grouping expressions: [current_date(None)], value: [key: int, value: string], " + "type: GroupBy]")) } + + test("SPARK-26021: Double and Float 0.0/-0.0 should be equal when grouping") { +val colName = "i" +def groupByCollect(df: DataFrame): Array[Row] = { + df.groupBy(colName).count().collect() +} +def assertResult[T](result: Array[Row], zero: T)(implicit ordering: Ordering[T]): Unit = { + assert(result.length == 1) + // using compare since 0.0 == -0.0 is true + assert(ordering.compare(result(0).getAs[T](0), zero) == 0) --- End diff -- ah sorry I misread the code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23043: [SPARK-26021][SQL] replace minus zero with zero i...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23043#discussion_r234847137 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala --- @@ -723,4 +723,32 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { "grouping expressions: [current_date(None)], value: [key: int, value: string], " + "type: GroupBy]")) } + + test("SPARK-26021: Double and Float 0.0/-0.0 should be equal when grouping") { +val colName = "i" +def groupByCollect(df: DataFrame): Array[Row] = { + df.groupBy(colName).count().collect() +} +def assertResult[T](result: Array[Row], zero: T)(implicit ordering: Ordering[T]): Unit = { + assert(result.length == 1) + // using compare since 0.0 == -0.0 is true + assert(ordering.compare(result(0).getAs[T](0), zero) == 0) + assert(result(0).getLong(1) == 3) +} + +spark.conf.set("spark.sql.codegen.wholeStage", "false") +val doubles = + groupByCollect(Seq(0.0d, 0.0d, -0.0d).toDF(colName)) +val doublesBoxed = + groupByCollect(Seq(Double.box(0.0d), Double.box(0.0d), Double.box(-0.0d)).toDF(colName)) +val floats = + groupByCollect(Seq(0.0f, -0.0f, 0.0f).toDF(colName)) --- End diff -- why we have to turn off whole-stage-codegen? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r234844186 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; + +/** + * The base interface for v2 data sources which don't have a real catalog. Implementations must + * have a public, 0-arg constructor. + * + * The major responsibility of this interface is to return a {@link Table} for read/write. + */ +@InterfaceStability.Evolving +// TODO: do not extend `DataSourceV2`, after we finish the API refactor completely. +public interface TableProvider extends DataSourceV2 { + + /** + * Return a {@link Table} instance to do read/write with user-specified options. + * + * @param options the user-specified options that can identify a table, e.g. file path, Kafka + *topic name, etc. It's an immutable case-insensitive string-to-string map. + */ + Table getTable(DataSourceOptions options); + + /** + * Return a {@link Table} instance to do read/write with user-specified schema and options. + * + * By default this method throws {@link UnsupportedOperationException}, implementations should + * override this method to handle user-specified schema. + * + * @param options the user-specified options that can identify a table, e.g. file path, Kafka + *topic name, etc. It's an immutable case-insensitive string-to-string map. + * @param schema the user-specified schema. + */ + default Table getTable(DataSourceOptions options, StructType schema) { --- End diff -- It's a different thing. Think about you are reading a parquet file, and you know exactly what its physical schema is, and you don't want Spark to waste a job to infer the schema. Then you can specify the schema when reading. Next, Spark will analyze the query, and figure out what the required schema is. This step is automatic and driven by Spark. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23054: [SPARK-26085][SQL] Key attribute of non-struct type unde...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23054 sorry it conflicts, can you resolve it? I think it's ready to go --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23079: [SPARK-26107][SQL] Extend ReplaceNullWithFalseInPredicat...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23079 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23084: [SPARK-26117][CORE][SQL]use SparkOutOfMemoryError instea...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23084 good catch! thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23084: [SPARK-26117][CORE][SQL]use SparkOutOfMemoryError instea...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23084 add to whitelist --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23084: [SPARK-26117][CORE][SQL]use SparkOutOfMemoryError instea...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23084 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23084: [SPARK-26117][CORE][SQL]use SparkOutOfMemoryError...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23084#discussion_r234661975 --- Diff: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java --- @@ -741,7 +742,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) { try { growAndRehash(); - } catch (OutOfMemoryError oom) { + } catch (SparkOutOfMemoryError oom) { --- End diff -- do you know what was the behavior before? Will we propagate the exception all the way up and kill the executor? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23086: [SPARK-25528][SQL] data source v2 API refactor (batch re...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23086 @rxin @rdblue @jose-torres @gatorsmile @gengliangwang @mccheah --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r234650248 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical +import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} +import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.sources.v2.DataSourceV2 +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousReadSupport, MicroBatchReadSupport} + +/** + * Physical plan node for scanning data from a data source. + */ +// TODO: micro-batch should be handled by `DataSourceV2ScanExec`, after we finish the API refactor +// completely. +case class DataSourceV2StreamingScanExec( --- End diff -- I have to use two physical nodes, since batch and streaming have different APIs now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/23086 [SPARK-25528][SQL] data source v2 API refactor (batch read) ## What changes were proposed in this pull request? This is the first step of the data source v2 API refactor [proposal](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing) It adds the new API for batch read, without removing the old APIs, as they are still needed for streaming sources. More concretely, it adds 1. `TableProvider`, works like an anonymous catalog 2. `Table`, represents a structured data set. 3. `ScanBuilder` and `Scan`, a logical represents of data source scan 4. `Batch`, a physical representation of data source batch scan. ## How was this patch tested? existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark refactor-batch Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23086.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23086 commit f06b5c58b1a890d425abd575fa6f4c40da7c4b3d Author: Wenchen Fan Date: 2018-11-19T11:05:07Z data source v2 API refactor (batch read) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user cloud-fan closed the pull request at: https://github.com/apache/spark/pull/22547 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23045: [SPARK-26071][SQL] disallow map as map key
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23045 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23079: [SPARK-26107][SQL] Extend ReplaceNullWithFalseInP...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23079#discussion_r234639734 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -767,6 +767,15 @@ object ReplaceNullWithFalse extends Rule[LogicalPlan] { replaceNullWithFalse(cond) -> value } cw.copy(branches = newBranches) + case af @ ArrayFilter(_, lf @ LambdaFunction(func, _, _)) => --- End diff -- ah i see. Sorry I missed it. Then it's safer to use a white-list here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23025: [SPARK-26024][SQL]: Update documentation for repartition...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23025 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23082: [SPARK-26112][SQL] Update since versions of new built-in...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23082 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23040: [SPARK-26068][Core]ChunkedByteBufferInputStream should h...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23040 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org