[GitHub] spark pull request #14698: [SPARK-17061][SPARK-17093][SQL] `MapObjects` shou...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14698#discussion_r76579917 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala --- @@ -136,7 +136,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { // some expression is reusing variable names across different instances. // This behavior is tested in ExpressionEvalHelperSuite. val plan = generateProject( - GenerateUnsafeProjection.generate( + UnsafeProjection.create( --- End diff -- @viirya maybe test against the following? - + this patch's changes to ObjectExpressionsSuite.scala - + this patch's changes to ExpressionEvalHelper.scala (this is also critical) - - this patch's changes to objects.scala --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14298: [SPARK-16283][SQL] Implement `percentile_approx` ...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14298#discussion_r76548403 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileApprox.scala --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.QuantileSummaries.Stats +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ + +/** + * Computes an approximate percentile (quantile) using the G-K algorithm (see below), for very + * large numbers of rows where the regular percentile() UDAF might run out of memory. + * + * The input is a single double value or an array of double values representing the percentiles + * requested. The output, corresponding to the input, is either a single double value or an + * array of doubles that are the percentile values. + */ +@ExpressionDescription( + usage = """_FUNC_(col, p [, B]) - Returns an approximate pth percentile of a numeric column in the + group. The B parameter, which defaults to 1000, controls approximation accuracy at the cost of + memory; higher values yield better approximations. +_FUNC_(col, array(p1 [, p2]...) [, B]) - Same as above, but accepts and returns an array of + percentile values instead of a single one. +""") +case class PercentileApprox( +child: Expression, +percentilesExpr: Expression, +bExpr: Option[Expression], +percentiles: Seq[Double], // the extracted percentiles +B: Int,// the extracted B +resultAsArray: Boolean,// whether to return the result as an array +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends ImperativeAggregate { + + private def this(child: Expression, percentilesExpr: Expression, bExpr: Option[Expression]) = { +this( + child = child, + percentilesExpr = percentilesExpr, + bExpr = bExpr, + // validate and extract percentiles + percentiles = PercentileApprox.validatePercentilesLiteral(percentilesExpr)._1, + // validate and extract B + B = bExpr.map(PercentileApprox.validateBLiteral(_)).getOrElse(PercentileApprox.B_DEFAULT), + // validate and mark whether we should return results as array of double or not + resultAsArray = PercentileApprox.validatePercentilesLiteral(percentilesExpr)._2) + } + + // Constructor for the "_FUNC_(col, p) / _FUNC_(col, array(p1, ...))" form + def this(child: Expression, percentilesExpr: Expression) = { +this(child, percentilesExpr, None) + } + + // Constructor for the "_FUNC_(col, p, B) / _FUNC_(col, array(p1, ...), B)" form + def this(child: Expression, percentilesExpr: Expression, bExpr: Expression) = { +this(child, percentilesExpr, Some(bExpr)) + } + + override def prettyName: String = "percentile_approx" + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + override def children: Seq[Expression] = +bExpr.map(child :: percentilesExpr :: _ :: Nil).getOrElse(child :: percentilesExpr :: Nil) + + // we would return null for empty inputs + override def nullable: Boolean = t
[GitHub] spark issue #14698: [SPARK-17061][SPARK-17093][SQL] `MapObjects` should make...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14698 Thanks @hvanhovell for the review! This patch has been updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14698: [SPARK-17061][SPARK-17093][SQL] `MapObjects` shou...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14698#discussion_r76177037 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala --- @@ -136,7 +136,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { // some expression is reusing variable names across different instances. // This behavior is tested in ExpressionEvalHelperSuite. val plan = generateProject( - GenerateUnsafeProjection.generate( + UnsafeProjection.create( --- End diff -- yea. `GenerateUnsafeProjection.generate` here was not able to use unsafe-backed data structure because it's `Create*Struct`. `UnsafeProjection.create`, however, does use unsafe-backed data structure (UnsafeRow, UnsafeArrayData, ...) so that this test is valid. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14698: [SPARK-17061][SPARK-17093][SQL] `MapObjects` shou...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14698#discussion_r76176582 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -474,6 +474,20 @@ case class MapObjects private( s"$seq == null ? $array[$loopIndex] : $seq.apply($loopIndex)" } +// Make a copy of the data if it's unsafe-backed +val genFunctionValue = lambdaFunction.dataType match { --- End diff -- we're calling `toString` on a `UTF8String`, so maybe there's no need to clone `UTF8String`s? ```java ... /* 072 */value8 = MapObjects_loopValue2.getUTF8String(0); ... /* 082 */funcResult = value8.toString(); ... /* 086 */value7 = (java.lang.String) funcResult; ... /* 128 */convertedArray[loopIndex] = ...; ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14698: [SPARK-17061][SPARK-17093][SQL] `MapObjects` shou...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14698#discussion_r76176311 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -474,6 +474,20 @@ case class MapObjects private( s"$seq == null ? $array[$loopIndex] : $seq.apply($loopIndex)" } +// Make a copy of the data if it's unsafe-backed +val genFunctionValue = lambdaFunction.dataType match { + case StructType(_) => +s"(${genFunction.value} instanceof ${classOf[MutableRow].getName}? " + --- End diff -- done. indeed we should narrow down to `UnsafeRow`, since other `MutableRow`s' backed data are not shared even though they are mutable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14698: [SPARK-17061][SPARK-17093][SQL] `MapObjects` shou...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14698#discussion_r76176321 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -474,6 +474,20 @@ case class MapObjects private( s"$seq == null ? $array[$loopIndex] : $seq.apply($loopIndex)" } +// Make a copy of the data if it's unsafe-backed +val genFunctionValue = lambdaFunction.dataType match { + case StructType(_) => +s"(${genFunction.value} instanceof ${classOf[MutableRow].getName}? " + + s"${genFunction.value}.copy() : ${genFunction.value})" + case ArrayType(_, _) => +s"(${genFunction.value} instanceof ${classOf[UnsafeArrayData].getName}? " + --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14298: [SPARK-16283][SQL] Implement `percentile_approx` ...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14298#discussion_r75830247 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileApprox.scala --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.QuantileSummaries.Stats +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ + +/** + * Computes an approximate percentile (quantile) using the G-K algorithm (see below), for very + * large numbers of rows where the regular percentile() UDAF might run out of memory. + * + * The input is a single double value or an array of double values representing the percentiles + * requested. The output, corresponding to the input, is either a single double value or an + * array of doubles that are the percentile values. + */ +@ExpressionDescription( + usage = """_FUNC_(col, p [, B]) - Returns an approximate pth percentile of a numeric column in the + group. The B parameter, which defaults to 1000, controls approximation accuracy at the cost of + memory; higher values yield better approximations. +_FUNC_(col, array(p1 [, p2]...) [, B]) - Same as above, but accepts and returns an array of + percentile values instead of a single one. +""") +case class PercentileApprox( +child: Expression, +percentilesExpr: Expression, +bExpr: Option[Expression], +percentiles: Seq[Double], // the extracted percentiles +B: Int,// the extracted B +resultAsArray: Boolean,// whether to return the result as an array +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends ImperativeAggregate { + + private def this(child: Expression, percentilesExpr: Expression, bExpr: Option[Expression]) = { +this( + child = child, + percentilesExpr = percentilesExpr, + bExpr = bExpr, + // validate and extract percentiles + percentiles = PercentileApprox.validatePercentilesLiteral(percentilesExpr)._1, + // validate and extract B + B = bExpr.map(PercentileApprox.validateBLiteral(_)).getOrElse(PercentileApprox.B_DEFAULT), + // validate and mark whether we should return results as array of double or not + resultAsArray = PercentileApprox.validatePercentilesLiteral(percentilesExpr)._2) + } + + // Constructor for the "_FUNC_(col, p) / _FUNC_(col, array(p1, ...))" form + def this(child: Expression, percentilesExpr: Expression) = { +this(child, percentilesExpr, None) + } + + // Constructor for the "_FUNC_(col, p, B) / _FUNC_(col, array(p1, ...), B)" form + def this(child: Expression, percentilesExpr: Expression, bExpr: Expression) = { +this(child, percentilesExpr, Some(bExpr)) + } + + override def prettyName: String = "percentile_approx" + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + override def children: Seq[Expression] = +bExpr.map(child :: percentilesExpr :: _ :: Nil).getOrElse(child :: percentilesExpr :: Nil) + + // we would return null for empty inputs + override def nullable: Boolean = t
[GitHub] spark issue #14118: [SPARK-16462][SPARK-16460][SPARK-15144][SQL] Make CSV ca...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14118 @rxin yes all empty (e.g. zero sized string) values become null values once they are read back. E.g. given `test.csv`: ``` 1,,3, ``` `spark.read.csv("test.csv").show()` produces: ``` +---++---++ |_c0| _c1|_c2| _c3| +---++---++ | 1|null| 3|null| +---++---++ ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14118: [SPARK-16462][SPARK-16460][SPARK-15144][SQL] Make...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14118#discussion_r75426062 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -370,7 +370,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * from values being read should be skipped. * `ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing * whitespaces from values being read should be skipped. - * `nullValue` (default empty string): sets the string representation of a null value. + * `nullValue` (default empty string): sets the string representation of a null value. Since --- End diff -- Oh thanks! Indeed there are two occurrences (one in readwriter.py / one in `streaming.py`) needs fixing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14698: [SPARK-17061][SPARK-17093][SQL] `MapObjects` shou...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/14698 [SPARK-17061][SPARK-17093][SQL] `MapObjects` should make copies of unsafe-backed data ## What changes were proposed in this pull request? Currently `MapObjects` does not make copies of unsafe-backed data, leading to problems like [SPARK-17061](https://issues.apache.org/jira/browse/SPARK-17061) [SPARK-17093](https://issues.apache.org/jira/browse/SPARK-17093). This patch makes `MapObjects` make copies of unsafe-backed data. ## How was this patch tested? Add a new test case which would fail without this patch. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark mapobjects-copy Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14698.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14698 commit 4fc1ec51a938762a70bd3a50111b0b3a00e94955 Author: Liwei Lin <lwl...@gmail.com> Date: 2016-08-18T02:53:30Z MapObjects should copy unsafe-backed data --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14118: [SPARK-16462][SPARK-16460][SPARK-15144][SQL] Make CSV ca...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14118 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14118: [SPARK-16462][SPARK-16460][SPARK-15144][SQL] Make CSV ca...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14118 This is ready for review. To summarize, this patch casts user-specified `nullValue`s to `null`s for all supported types including the string type: - this fixes the problem where null dates, null timestamps, etc were not cast to `null` correctly; - this also casts `nullValue` to `null` for string type, as per @falaki @HyukjinKwon and many people's comments. But please note, this is a behavior change from 2.0.0. @rxin @falaki could you take another look? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14298: [SPARK-16283][SQL] Implement `percentile_approx` ...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14298#discussion_r73995159 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileApprox.scala --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.QuantileSummaries.Stats +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ + +/** + * Computes an approximate percentile (quantile) using the G-K algorithm (see below), for very + * large numbers of rows where the regular percentile() UDAF might run out of memory. + * + * The input is a single double value or an array of double values representing the percentiles + * requested. The output, corresponding to the input, is either a single double value or an + * array of doubles that are the percentile values. + */ +@ExpressionDescription( + usage = """_FUNC_(col, p [, B]) - Returns an approximate pth percentile of a numeric column in the + group. The B parameter, which defaults to 1000, controls approximation accuracy at the cost of + memory; higher values yield better approximations. +_FUNC_(col, array(p1 [, p2]...) [, B]) - Same as above, but accepts and returns an array of + percentile values instead of a single one. +""") +case class PercentileApprox( +child: Expression, +percentilesExpr: Expression, +bExpr: Option[Expression], +percentiles: Seq[Double], // the extracted percentiles +B: Int,// the extracted B +resultAsArray: Boolean,// whether to return the result as an array +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends ImperativeAggregate { + + private def this(child: Expression, percentilesExpr: Expression, bExpr: Option[Expression]) = { +this( + child = child, + percentilesExpr = percentilesExpr, + bExpr = bExpr, + // validate and extract percentiles + percentiles = PercentileApprox.validatePercentilesLiteral(percentilesExpr)._1, + // validate and extract B + B = bExpr.map(PercentileApprox.validateBLiteral(_)).getOrElse(PercentileApprox.B_DEFAULT), + // validate and mark whether we should return results as array of double or not + resultAsArray = PercentileApprox.validatePercentilesLiteral(percentilesExpr)._2) + } + + // Constructor for the "_FUNC_(col, p) / _FUNC_(col, array(p1, ...))" form + def this(child: Expression, percentilesExpr: Expression) = { +this(child, percentilesExpr, None) + } + + // Constructor for the "_FUNC_(col, p, B) / _FUNC_(col, array(p1, ...), B)" form + def this(child: Expression, percentilesExpr: Expression, bExpr: Expression) = { +this(child, percentilesExpr, Some(bExpr)) + } + + override def prettyName: String = "percentile_approx" + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + override def children: Seq[Expression] = +bExpr.map(child :: percentilesExpr :: _ :: Nil).getOrElse(child :: percentilesExpr :: Nil) + + // we would return null for empty inputs + override def nullable: Boolean = t
[GitHub] spark pull request #14298: [SPARK-16283][SQL] Implement `percentile_approx` ...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14298#discussion_r73993189 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileApprox.scala --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.QuantileSummaries.Stats +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ + +/** + * Computes an approximate percentile (quantile) using the G-K algorithm (see below), for very + * large numbers of rows where the regular percentile() UDAF might run out of memory. + * + * The input is a single double value or an array of double values representing the percentiles + * requested. The output, corresponding to the input, is either a single double value or an + * array of doubles that are the percentile values. + */ +@ExpressionDescription( + usage = """_FUNC_(col, p [, B]) - Returns an approximate pth percentile of a numeric column in the + group. The B parameter, which defaults to 1000, controls approximation accuracy at the cost of + memory; higher values yield better approximations. +_FUNC_(col, array(p1 [, p2]...) [, B]) - Same as above, but accepts and returns an array of + percentile values instead of a single one. +""") +case class PercentileApprox( +child: Expression, +percentilesExpr: Expression, +bExpr: Option[Expression], +percentiles: Seq[Double], // the extracted percentiles +B: Int,// the extracted B +resultAsArray: Boolean,// whether to return the result as an array +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends ImperativeAggregate { + + private def this(child: Expression, percentilesExpr: Expression, bExpr: Option[Expression]) = { +this( + child = child, + percentilesExpr = percentilesExpr, + bExpr = bExpr, + // validate and extract percentiles + percentiles = PercentileApprox.validatePercentilesLiteral(percentilesExpr)._1, + // validate and extract B + B = bExpr.map(PercentileApprox.validateBLiteral(_)).getOrElse(PercentileApprox.B_DEFAULT), + // validate and mark whether we should return results as array of double or not + resultAsArray = PercentileApprox.validatePercentilesLiteral(percentilesExpr)._2) + } + + // Constructor for the "_FUNC_(col, p) / _FUNC_(col, array(p1, ...))" form + def this(child: Expression, percentilesExpr: Expression) = { +this(child, percentilesExpr, None) + } + + // Constructor for the "_FUNC_(col, p, B) / _FUNC_(col, array(p1, ...), B)" form + def this(child: Expression, percentilesExpr: Expression, bExpr: Expression) = { +this(child, percentilesExpr, Some(bExpr)) + } + + override def prettyName: String = "percentile_approx" + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + override def children: Seq[Expression] = +bExpr.map(child :: percentilesExpr :: _ :: Nil).getOrElse(child :: percentilesExpr :: Nil) + + // we would return null for empty inputs + override def nullable: Boolean = t
[GitHub] spark pull request #14298: [SPARK-16283][SQL] Implement `percentile_approx` ...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14298#discussion_r73990892 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileApprox.scala --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.QuantileSummaries.Stats +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ + +/** + * Computes an approximate percentile (quantile) using the G-K algorithm (see below), for very + * large numbers of rows where the regular percentile() UDAF might run out of memory. + * + * The input is a single double value or an array of double values representing the percentiles + * requested. The output, corresponding to the input, is either a single double value or an + * array of doubles that are the percentile values. + */ +@ExpressionDescription( + usage = """_FUNC_(col, p [, B]) - Returns an approximate pth percentile of a numeric column in the + group. The B parameter, which defaults to 1000, controls approximation accuracy at the cost of + memory; higher values yield better approximations. +_FUNC_(col, array(p1 [, p2]...) [, B]) - Same as above, but accepts and returns an array of + percentile values instead of a single one. +""") +case class PercentileApprox( +child: Expression, +percentilesExpr: Expression, +bExpr: Option[Expression], +percentiles: Seq[Double], // the extracted percentiles +B: Int,// the extracted B +resultAsArray: Boolean,// whether to return the result as an array +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends ImperativeAggregate { + + private def this(child: Expression, percentilesExpr: Expression, bExpr: Option[Expression]) = { +this( + child = child, + percentilesExpr = percentilesExpr, + bExpr = bExpr, + // validate and extract percentiles + percentiles = PercentileApprox.validatePercentilesLiteral(percentilesExpr)._1, + // validate and extract B + B = bExpr.map(PercentileApprox.validateBLiteral(_)).getOrElse(PercentileApprox.B_DEFAULT), + // validate and mark whether we should return results as array of double or not + resultAsArray = PercentileApprox.validatePercentilesLiteral(percentilesExpr)._2) + } + + // Constructor for the "_FUNC_(col, p) / _FUNC_(col, array(p1, ...))" form + def this(child: Expression, percentilesExpr: Expression) = { +this(child, percentilesExpr, None) + } + + // Constructor for the "_FUNC_(col, p, B) / _FUNC_(col, array(p1, ...), B)" form + def this(child: Expression, percentilesExpr: Expression, bExpr: Expression) = { +this(child, percentilesExpr, Some(bExpr)) + } + + override def prettyName: String = "percentile_approx" + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + override def children: Seq[Expression] = +bExpr.map(child :: percentilesExpr :: _ :: Nil).getOrElse(child :: percentilesExpr :: Nil) + + // we would return null for empty inputs + override def nullable: Boolean = t
[GitHub] spark pull request #14118: [SPARK-16462][SPARK-16460][SPARK-15144][SQL] Make...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14118#discussion_r73666106 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala --- @@ -68,16 +68,48 @@ class CSVTypeCastSuite extends SparkFunSuite { } test("Nullable types are handled") { -assert(CSVTypeCast.castTo("", IntegerType, nullable = true, CSVOptions()) == null) +assertNull( + CSVTypeCast.castTo("-", ByteType, nullable = true, CSVOptions("nullValue", "-"))) +assertNull( + CSVTypeCast.castTo("-", ShortType, nullable = true, CSVOptions("nullValue", "-"))) +assertNull( + CSVTypeCast.castTo("-", IntegerType, nullable = true, CSVOptions("nullValue", "-"))) +assertNull( + CSVTypeCast.castTo("-", LongType, nullable = true, CSVOptions("nullValue", "-"))) +assertNull( + CSVTypeCast.castTo("-", FloatType, nullable = true, CSVOptions("nullValue", "-"))) +assertNull( + CSVTypeCast.castTo("-", DoubleType, nullable = true, CSVOptions("nullValue", "-"))) +assertNull( + CSVTypeCast.castTo("-", BooleanType, nullable = true, CSVOptions("nullValue", "-"))) +assertNull( + CSVTypeCast.castTo("-", DecimalType.DoubleDecimal, true, CSVOptions("nullValue", "-"))) +assertNull( + CSVTypeCast.castTo("-", TimestampType, nullable = true, CSVOptions("nullValue", "-"))) +assertNull( + CSVTypeCast.castTo("-", DateType, nullable = true, CSVOptions("nullValue", "-"))) +assertNull( + CSVTypeCast.castTo("-", StringType, nullable = true, CSVOptions("nullValue", "-"))) } - test("String type should always return the same as the input") { + test("String type should also respect `nullValue`") { assert( CSVTypeCast.castTo("", StringType, nullable = true, CSVOptions()) == -UTF8String.fromString("")) +null) assert( CSVTypeCast.castTo("", StringType, nullable = false, CSVOptions()) == UTF8String.fromString("")) + +assert( + CSVTypeCast.castTo("", StringType, nullable = true, CSVOptions("nullValue", "null")) == +UTF8String.fromString("")) +assert( + CSVTypeCast.castTo("", StringType, nullable = false, CSVOptions("nullValue", "null")) == +UTF8String.fromString("")) + +assert( + CSVTypeCast.castTo(null, StringType, nullable = true, CSVOptions("nullValue", "null")) == +null) --- End diff -- Oh thanks! I did this intentionally so that the change is clear to reviewers. Maybe let's see what others think and I'm glad to change this when necessary. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14118: [SPARK-16462][SPARK-16460][SPARK-15144][SQL] Make CSV ca...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14118 @falaki could you take a look at the lasted update: [[bf01cea] StringType should also respect `nullValue`](https://github.com/apache/spark/pull/14118/commits/bf01cea8273f00386ceef6459f8b8fe2c169e12a)? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14298: [SPARK-16283][SQL] Implement `percentile_approx` SQL fun...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14298 @hvanhovell comments addressed. Please let me know when there's more to do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14298: [SPARK-16283][SQL] Implement `percentile_approx` ...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14298#discussion_r73101674 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileApprox.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.QuantileSummaries.Stats +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ + +/** + * Computes an approximate percentile (quantile) using the G-K algorithm (see below), for very + * large numbers of rows where the regular percentile() UDAF might run out of memory. + * + * The input is a single double value or an array of double values representing the percentiles + * requested. The output, corresponding to the input, is either an single double value or an + * array of doubles that are the percentile values. + */ +@ExpressionDescription( + usage = """_FUNC_(col, p [, B]) - Returns an approximate pth percentile of a numeric column in the + group. The B parameter, which defaults to 1000, controls approximation accuracy at the cost of + memory; higher values yield better approximations. +_FUNC_(col, array(p1 [, p2]...) [, B]) - Same as above, but accepts and returns an array of + percentile values instead of a single one. +""") +case class PercentileApprox( +child: Expression, +percentilesExpr: Expression, +bExpr: Option[Expression], +percentiles: Seq[Double], // the extracted percentiles +B: Int,// the extracted B --- End diff -- I don't have strong preference here -- let's see what reviewers say. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14118: [SPARK-16462][SPARK-16460][SPARK-15144][SQL] Make CSV ca...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14118 Here are some findings as I dug a little: 1. Since https://github.com/databricks/spark-csv/pull/102(Jul, 2015), we would cast `""` as `null` for all types other than strings. For strings, `""` would still be `""`; 2. Then we had added `treatEmptyValuesAsNulls` in https://github.com/databricks/spark-csv/pull/147(Sep, 2015), after which, `""` would be `null` when `treatEmptyValuesAsNulls == true` and would be still `""` otherwise; 3. Then we had added `nullValue` in https://github.com/databricks/spark-csv/pull/224(Dec, 2015), so people could specify some string like `"MISSING"` other than the default `""` to represent null values. Then after the above 1.2.3., we have the following, which seems reasonable and is backward-compatible: (default) when nullVale == "" when nullValue == "MISSING" (default) when treatEmptyValuesAsNulls == false "" would cast to "" "" would cast to "" when treatEmptyValuesAsNulls == true "" would cast to null "" would cast to "" However we don't have this `treatEmptyValuesAsNulls` in Spark 2.0. @falaki would it be OK with you if I add it back? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14370: [SPARK-16713][SQL] Check codegen method size ≤ ...
Github user lw-lin closed the pull request at: https://github.com/apache/spark/pull/14370 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14370: [SPARK-16713][SQL] Check codegen method size ≤ 8K on c...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14370 @rxin @cloud-fan sorry for the ambiguity, but we're not enforcing this. There are 3 levels: - when `NO_OP` is on (which is the fault), we do nothing at all to a huge method; - when `WARN_...` is on, we'll just log a warning; - when `ERROR...` is on, we fail fast. So by default, this does not affect anything on end-user's side. We should turn on `ERROR_IF_EXCEEDS_JIT_LIMIT` **_only in test suits_** and only against critical methods, so that we can find performance issues asap because it fails fast. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14370: [SPARK-16713][SQL] Check codegen method size ≤ 8K on c...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14370 @davies would you also take a look? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14370: [SPARK-16713][SQL] Check codegen method size ≤ ...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/14370 [SPARK-16713][SQL] Check codegen method size ⤠8K on compile ## What changes were proposed in this pull request? Ideally, we would wish codegen methods to be less than 8KB for bytecode size. Beyond 8K JIT won't compile and can cause performance degradation. Instead of understanding the generated source code and automatically breaking large methods into smaller ones (which is also a little hard to do), it'd be better we discover large methods asap and then manually improve the source code. This patch adds support for checking codegen method size on compile. We can specify for each method what is the expected behavior when it exceeds 8KB for bytecode size: ```scala /** No-op when the method exceeds 8K size. */ case object NO_OP extends FunctionSizeHint /** Log a warning when the method exceeds 8K size. */ case object WARN_IF_EXCEEDS_JIT_LIMIT extends FunctionSizeHint /** * Throw a compilation exception when the method exceeds 8K size. * Fail fast so that we can catch it asap; this is useful in testing corner/edge cases. */ case object ERROR_IF_EXCEEDS_JIT_LIMIT extends FunctionSizeHint ``` This way we can test against some extreme case such as a 1-columns-wide table, to see if the generated code is small enough to get a chance to be JITed at runtime. ## Sample Usage sample usage: ```scala val codeBody = s""" public static void inc() { int i = 0; i++; i++; ... // enough i++ s for this inc() methods to exceed 8K size } """ // == prior to this patch === ctx = new CodegenContext() ctx.addNewFunction("inc", genCode(15000)) CodeGenerator.compile( new CodeAndComment(ctx.declareAddedFunctions(), emptyComments)) // == after this patch == // Exception: failed to compile. Method org.apache.spark.sql.catalyst.expressions.GeneratedClass.inc // should not exceed 8K size limit -- observed size is 45003 ctx = new CodegenContext() ctx.addNewFunction("inc", genCode(15000), CodegenContext.ERROR_IF_EXCEEDS_JIT_LIMIT) CodeGenerator.compile( new CodeAndComment(ctx.declareAddedFunctions(), emptyComments, ctx.getFuncToSizeHintMap)) ``` ## How was this patch tested? - new unit test (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark codegen-method-size-8k Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14370.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14370 commit 19b6d1d056f2bd1b1bb4d502c31b418ffbfe8d65 Author: Liwei Lin <lwl...@gmail.com> Date: 2016-07-26T08:39:36Z Check codegen method size ⤠8K on compile --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14298: [SPARK-16283][SQL] Implement `percentile_approx` SQL fun...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14298 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14324: [SPARK-16664][SQL] Fix persist call on Data frames with ...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14324 @breakdawn what else can we do to actually fix the ⥠8118 cols issue? We're actually running out of the constant pool when we compile the generated code. So maybe compile it into multiple classes? Or just fall back to the non-code-gen path? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14280: [SPARK-16515][SQL][FOLLOW-UP] Fix test `script` on OS X/...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14280 Maybe this is ready to go? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14280: [SPARK-16515][SQL][FOLLOW-UP] Fix test `script` o...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14280#discussion_r71971600 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala --- @@ -64,14 +67,17 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { import spark.implicits._ test("script") { -val df = Seq(("x1", "y1", "z1"), ("x2", "y2", "z2")).toDF("c1", "c2", "c3") -df.createOrReplaceTempView("script_table") -val query1 = sql( - """ -|SELECT col1 FROM (from(SELECT c1, c2, c3 FROM script_table) tempt_table -|REDUCE c1, c2, c3 USING 'bash src/test/resources/test_script.sh' AS -|(col1 STRING, col2 STRING)) script_test_table""".stripMargin) -checkAnswer(query1, Row("x1_y1") :: Row("x2_y2") :: Nil) +if (testCommandAvailable("bash") && testCommandAvailable("echo | sed")) { + val df = Seq(("x1", "y1", "z1"), ("x2", "y2", "z2")).toDF("c1", "c2", "c3") + df.createOrReplaceTempView("script_table") + val query1 = sql( +""" + |SELECT col1 FROM (from(SELECT c1, c2, c3 FROM script_table) tempt_table + |REDUCE c1, c2, c3 USING 'bash src/test/resources/test_script.sh' AS + |(col1 STRING, col2 STRING)) script_test_table""".stripMargin) + checkAnswer(query1, Row("x1_y1") :: Row("x2_y2") :: Nil) +} +// else skip this test --- End diff -- The only change here was the if check; i.e. if (testCommandAvailable("bash") && testCommandAvailable("echo | sed")) { // everything left unchanged } // else skip this test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14324: [SPARK-16664][SQL] Fix persist call on Data frames with ...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14324 @breakdawn yes that's a different issue and I'm looking into it. Regarding what this PR tries to fix, could you run this PR's change against [this test case](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala#L225) to see if there's more needs to be done? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14324: [SPARK-16664][SQL] Fix persist call on Data frames with ...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14324 @breakdawn it'd be great to do more tests when you open a request. As I'm investigate into this too, I found that my same fix works for 201 cols but fails for 8118 cols. The exact limit is 8117. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14298: [SPARK-16283][SQL] Implement `percentile_approx` SQL fun...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14298 @cloud-fan could you also help review this? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14298: [SPARK-16283][SQL] Implement `percentile_approx` ...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14298#discussion_r71815241 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileApprox.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.QuantileSummaries.Stats +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ + +/** + * Computes an approximate percentile (quantile) using the G-K algorithm (see below), for very + * large numbers of rows where the regular percentile() UDAF might run out of memory. + * + * The input is a single double value or an array of double values representing the percentiles + * requested. The output, corresponding to the input, is either an single double value or an + * array of doubles that are the percentile values. + */ +@ExpressionDescription( + usage = """_FUNC_(col, p [, B]) - Returns an approximate pth percentile of a numeric column in the + group. The B parameter, which defaults to 1000, controls approximation accuracy at the cost of + memory; higher values yield better approximations. +_FUNC_(col, array(p1 [, p2]...) [, B]) - Same as above, but accepts and returns an array of + percentile values instead of a single one. +""") +case class PercentileApprox( +child: Expression, +percentilesExpr: Expression, +bExpr: Option[Expression], +percentiles: Seq[Double], // the extracted percentiles +B: Int,// the extracted B +resultAsArray: Boolean,// whether to return the result as an array +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends ImperativeAggregate { + + private def this(child: Expression, percentilesExpr: Expression, bExpr: Option[Expression]) = { +this( + child = child, + percentilesExpr = percentilesExpr, + bExpr = bExpr, + // validate and extract percentiles + percentiles = PercentileApprox.validatePercentilesLiteral(percentilesExpr)._1, + // validate and extract B + B = bExpr.map(PercentileApprox.validateBLiteral(_)).getOrElse(PercentileApprox.B_DEFAULT), + // validate and mark whether we should return results as array of double or not + resultAsArray = PercentileApprox.validatePercentilesLiteral(percentilesExpr)._2) + } + + // Constructor for the "_FUNC_(col, p) / _FUNC_(col, array(p1, ...))" form + def this(child: Expression, percentilesExpr: Expression) = { +this(child, percentilesExpr, None) + } + + // Constructor for the "_FUNC_(col, p, B) / _FUNC_(col, array(p1, ...), B)" form + def this(child: Expression, percentilesExpr: Expression, bExpr: Expression) = { +this(child, percentilesExpr, Some(bExpr)) + } + + override def prettyName: String = "percentile_approx" + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + override def children: Seq[Expression] = +bExpr.map(child :: percentilesExpr :: _ :: Nil).getOrElse(child :: percentilesExpr :: Nil) + + // we would return null for empty inputs + override def nullable: Boolean = t
[GitHub] spark issue #14298: [SPARK-16283][SQL] Implement `percentile_approx` SQL fun...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14298 @hvanhovell could you take a look at this? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14298: [SPARK-16283][SQL] Implement `percentile_approx` SQL fun...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14298 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14298: [SPARK-16283][SQL] Implement `percentile_approx` ...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/14298 [SPARK-16283][SQL] Implement `percentile_approx` SQL function ## What changes were proposed in this pull request? This patch Implements `percentile_approx` SQL function using Spark's implementation of G-K algorithm. - commit 1: moves the G-K algorithm implementation(`QuantileSummaries` and related tests) from `sql/core` to `sql/catalyst` - commit 2: implements `percentile_approx` using G-K algorithm ## How was this patch tested? - Jenkins - added new tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark impl_percentile_approx Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14298.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14298 commit d3a6dc825577a4a5e44e8eb0f8e61ef2053e127d Author: Liwei Lin <lwl...@gmail.com> Date: 2016-07-21T08:29:00Z Move G-K all from `sql/core` to `sql/catalyst` commit 110158062cb1f6a571ad8e0bab9bc5962107b59a Author: Liwei Lin <lwl...@gmail.com> Date: 2016-07-21T08:38:06Z Implement percentile_approx --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14237: [WIP][SPARK-16283][SQL] Implement `percentile_app...
Github user lw-lin closed the pull request at: https://github.com/apache/spark/pull/14237 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14237: [WIP][SPARK-16283][SQL] Implement `percentile_app...
GitHub user lw-lin reopened a pull request: https://github.com/apache/spark/pull/14237 [WIP][SPARK-16283][SQL] Implement `percentile_approx` SQL function I'll reopen once it's ready for review, thanks! You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark percentile_approx Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14237.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14237 commit 479bf7387f0dcba41ce6ab25b7008c7fd6dd7b07 Author: Liwei Lin <lwl...@gmail.com> Date: 2016-07-17T12:53:15Z Implement function `percentile_approx` commit 9d75c0a9fae00c40ef931a7c643a45161990cda4 Author: Reynold Xin <r...@databricks.com> Date: 2016-07-17T06:42:28Z [SPARK-16584][SQL] Move regexp unit tests to RegexpExpressionsSuite ## What changes were proposed in this pull request? This patch moves regexp related unit tests from StringExpressionsSuite to RegexpExpressionsSuite to match the file name for regexp expressions. ## How was this patch tested? This is a test only change. Author: Reynold Xin <r...@databricks.com> Closes #14230 from rxin/SPARK-16584. commit f7ec0233471c9a4acd4cfe7df28ca96f0fda0c61 Author: Felix Cheung <felixcheun...@hotmail.com> Date: 2016-07-18T02:02:21Z [SPARK-16027][SPARKR] Fix R tests SparkSession init/stop ## What changes were proposed in this pull request? Fix R SparkSession init/stop, and warnings of reusing existing Spark Context ## How was this patch tested? unit tests shivaram Author: Felix Cheung <felixcheun...@hotmail.com> Closes #14177 from felixcheung/rsessiontest. commit 7fcb4231dd940fba91047ea192d569a4763b7631 Author: Reynold Xin <r...@databricks.com> Date: 2016-07-18T05:48:00Z [SPARK-16588][SQL] Deprecate monotonicallyIncreasingId in Scala/Java This patch deprecates monotonicallyIncreasingId in Scala/Java, as done in Python. This patch was originally written by HyukjinKwon. Closes #14236. commit ceed2f29c9c34cd0663bef1fb984066b5a687805 Author: WeichenXu <weichenxu...@outlook.com> Date: 2016-07-18T08:11:53Z [MINOR][TYPO] fix fininsh typo ## What changes were proposed in this pull request? fininsh => finish ## How was this patch tested? N/A Author: WeichenXu <weichenxu...@outlook.com> Closes #14238 from WeichenXu123/fix_fininsh_typo. commit d635cc21baea6e28313c6deea41e5e45353a9014 Author: krishnakalyan3 <krishnakaly...@gmail.com> Date: 2016-07-18T16:46:23Z [SPARK-16055][SPARKR] warning added while using sparkPackages with spark-submit ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-16055 sparkPackages - argument is passed and we detect that we are in the R script mode, we should print some warning like --packages flag should be used with with spark-submit ## How was this patch tested? In my system locally Author: krishnakalyan3 <krishnakaly...@gmail.com> Closes #14179 from krishnakalyan3/spark-pkg. commit e01f19582cc724028b60bcf1ee1f8b4d33d91efd Author: hyukjinkwon <gurwls...@gmail.com> Date: 2016-07-18T16:49:14Z [SPARK-16351][SQL] Avoid per-record type dispatch in JSON when writing ## What changes were proposed in this pull request? Currently, `JacksonGenerator.apply` is doing type-based dispatch for each row to write appropriate values. It might not have to be done like this because the schema is already kept. So, appropriate writers can be created first according to the schema once, and then apply them to each row. This approach is similar with `CatalystWriteSupport`. This PR corrects `JacksonGenerator` so that it creates all writers for the schema once and then applies them to each row rather than type dispatching for every row. Benchmark was proceeded with the codes below: ```scala test("Benchmark for JSON writer") { val N = 500 << 8 val row = """{"struct":{"field1": true, "field2": 92233720368547758070}, "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]}, "arrayOfString":["str1", "str2"], "arrayOfInteger":[1, 2147483647, -2147483648], "arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808], "arrayOfBigInteger":[922337203685477580700, -922337203685477580800], "arrayOfDouble":[1.2,
[GitHub] spark issue #14280: [SPARK-16515][SQL][FOLLOW-UP] Fix test `script` on OS X/...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14280 `sed $'...'` works on both Linux & OS X! So let's switch back to `sed`. @srowen thanks a lot for the `$`! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14280: [SPARK-16515][SQL][FOLLOW-UP] Fix test `script` o...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14280#discussion_r71480386 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala --- @@ -64,14 +67,19 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { import spark.implicits._ test("script") { -val df = Seq(("x1", "y1", "z1"), ("x2", "y2", "z2")).toDF("c1", "c2", "c3") -df.createOrReplaceTempView("script_table") -val query1 = sql( - """ -|SELECT col1 FROM (from(SELECT c1, c2, c3 FROM script_table) tempt_table -|REDUCE c1, c2, c3 USING 'bash src/test/resources/test_script.sh' AS -|(col1 STRING, col2 STRING)) script_test_table""".stripMargin) -checkAnswer(query1, Row("x1_y1") :: Row("x2_y2") :: Nil) +if (testCommandAvailable("bash") && testCommandAvailable("echo | awk '{print $0}'")) { + val df = Seq(("x1", "y1", "z1"), ("x2", "y2", "z2")).toDF("c1", "c2", "c3") + df.createOrReplaceTempView("script_table") + val query1 = sql( +""" + |SELECT col1 FROM (from(SELECT c1, c2, c3 FROM script_table) tempt_table + |REDUCE c1, c2, c3 USING 'bash src/test/resources/test_script.sh' AS + |(col1 STRING, col2 STRING)) script_test_table""".stripMargin) + checkAnswer(query1, Row("x1_y1") :: Row("x2_y2") :: Nil) +} +else { --- End diff -- removed; thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14280: [SPARK-16515][SQL][FOLLOW-UP] Fix test `script` o...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14280#discussion_r71480336 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala --- @@ -64,14 +67,19 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { import spark.implicits._ test("script") { -val df = Seq(("x1", "y1", "z1"), ("x2", "y2", "z2")).toDF("c1", "c2", "c3") -df.createOrReplaceTempView("script_table") -val query1 = sql( - """ -|SELECT col1 FROM (from(SELECT c1, c2, c3 FROM script_table) tempt_table -|REDUCE c1, c2, c3 USING 'bash src/test/resources/test_script.sh' AS -|(col1 STRING, col2 STRING)) script_test_table""".stripMargin) -checkAnswer(query1, Row("x1_y1") :: Row("x2_y2") :: Nil) +if (testCommandAvailable("bash") && testCommandAvailable("echo | awk '{print $0}'")) { + val df = Seq(("x1", "y1", "z1"), ("x2", "y2", "z2")).toDF("c1", "c2", "c3") + df.createOrReplaceTempView("script_table") + val query1 = sql( +""" + |SELECT col1 FROM (from(SELECT c1, c2, c3 FROM script_table) tempt_table + |REDUCE c1, c2, c3 USING 'bash src/test/resources/test_script.sh' AS + |(col1 STRING, col2 STRING)) script_test_table""".stripMargin) + checkAnswer(query1, Row("x1_y1") :: Row("x2_y2") :: Nil) +} +else { + assert(true) +} --- End diff -- removed; thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14280: [SPARK-16515][SQL][FOLLOW-UP] Fix test `script` o...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14280#discussion_r71477795 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala --- @@ -64,14 +67,19 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { import spark.implicits._ test("script") { -val df = Seq(("x1", "y1", "z1"), ("x2", "y2", "z2")).toDF("c1", "c2", "c3") -df.createOrReplaceTempView("script_table") -val query1 = sql( - """ -|SELECT col1 FROM (from(SELECT c1, c2, c3 FROM script_table) tempt_table -|REDUCE c1, c2, c3 USING 'bash src/test/resources/test_script.sh' AS -|(col1 STRING, col2 STRING)) script_test_table""".stripMargin) -checkAnswer(query1, Row("x1_y1") :: Row("x2_y2") :: Nil) +if (testCommandAvailable("bash") && testCommandAvailable("echo | awk '{print $0}'")) { + val df = Seq(("x1", "y1", "z1"), ("x2", "y2", "z2")).toDF("c1", "c2", "c3") + df.createOrReplaceTempView("script_table") + val query1 = sql( +""" + |SELECT col1 FROM (from(SELECT c1, c2, c3 FROM script_table) tempt_table + |REDUCE c1, c2, c3 USING 'bash src/test/resources/test_script.sh' AS + |(col1 STRING, col2 STRING)) script_test_table""".stripMargin) + checkAnswer(query1, Row("x1_y1") :: Row("x2_y2") :: Nil) +} +else { + assert(true) +} --- End diff -- The only change here was the `if` check; i.e. ```scala if (testCommandAvailable("bash") && testCommandAvailable("echo | awk '{print $0}'")) { // everything left unchanged } else { assert(true) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14280: [SPARK-16515][SQL][FOLLOW-UP] Fix test `script` o...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/14280 [SPARK-16515][SQL][FOLLOW-UP] Fix test `script` on OS X/Windows... ## Problem OS X's `sed` doesn't understand `\t` at all, so this `script` test would fail: ``` == Results == !== Correct Answer - 2 == == Spark Answer - 2 == ![x1_y1][x1] ![x2_y2][x2] ``` In addition, this `script` test would also fail on systems like Windows where we couldn't be able to invoke `bash` or `echo ... | awk ...`. ## What changes were proposed in this pull request? This patch - switches from `sed` to `awk` for replacing `\t` purpose - adds command guards so that the `script` test would pass on systems like Windows ## How was this patch tested? - Jenkins - Manually verified tests pass on OS X You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark osx-sed Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14280.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14280 commit 35773defdcc1ed0a4f6044e805fab69cf5323df6 Author: Liwei Lin <lwl...@gmail.com> Date: 2016-07-20T06:56:12Z `sed` -> `awk` commit dc39c98a8c624ee35f8df1fb824a85f7c4c3741e Author: Liwei Lin <lwl...@gmail.com> Date: 2016-07-20T07:17:53Z Add `testCommandAvailable` gards --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14256: [SPARK-16620][CORE] Add back the tokenization process in...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14256 @srowen @dongjoon-hyun, thanks for the clarification. Yep I had verified manually that the java version does not need any change before I opened this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14256: [SPARK-16620][CORE] Add back tokenization process...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/14256 [SPARK-16620][CORE] Add back tokenization process in RDD.pipe(command: String) ## What changes were proposed in this pull request? Currently `RDD.pipe(command: String)`: - works only with a single command with no option specified, such as `RDD.pipe("wc")` - does not work when command is specified with some options, such as `RDD.pipe("wc -l")` This is a regression from Spark 1.6. This patch adds back tokenization process in RDD.pipe(command: String). ## How was this patch tested? Added a test which would pass in 1.6, would fail prior to this patch, and would pass after this patch. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark rdd-pipe Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14256.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14256 commit 85174658b5392b5fd9773a89ee7b24a3db08c334 Author: Liwei Lin <lwl...@gmail.com> Date: 2016-07-19T05:34:46Z Fix pipe(command) & pipe(command, env) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14237: [WIP][SPARK-16283][SQL] Implement `percentile_app...
Github user lw-lin closed the pull request at: https://github.com/apache/spark/pull/14237 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14237: [WIP][SPARK-16283][SQL] Implement `percentile_app...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/14237 [WIP][SPARK-16283][SQL] Implement `percentile_approx` SQL function ## What changes were proposed in this pull request? WIP ## How was this patch tested? WIP (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark percentile_approx Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14237.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14237 commit 479bf7387f0dcba41ce6ab25b7008c7fd6dd7b07 Author: Liwei Lin <lwl...@gmail.com> Date: 2016-07-17T12:53:15Z Implement function `percentile_approx` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14214: [SPARK-16545][SQL] Eliminate unnecessary rounds o...
Github user lw-lin closed the pull request at: https://github.com/apache/spark/pull/14214 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14214: [SPARK-16545][SQL] Eliminate unnecessary rounds of physi...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14214 Sure let's rewrite the incremental planner to solve problems more holistically; actually this patch is not satisfying to myself either. So I'm closing this, and -- thank you for the ideas! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14214: [SPARK-16545][SQL] Eliminate unnecessary rounds of physi...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14214 @marmbrus @zsxwing could you take a look and share some ideas? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14214: [SPARK-16545][SQL] Eliminate one unnecessary round of ph...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14214 @mariobriggs Thanks for the information! > 1 can be eliminated because 'executedPlan' is a ' lazy val' on QueryExecution ? Yea indeed. Its being there can provide us debug info but on second thought it might not be worth it. So let's also skip it in the case of `ForeachSink`. > ... However this cannot be the solution since SparkListenerSQLExecutionStart is a public API already Yea we probably do not want to modify this public API; so what we did in this patch was, passing [3]'s `incrementalExecution` into the listener so we'll only initialize physical planning only once for [2] and [3]. > ... why not keep [1] and the change to [2] be the simple case of changing L52 to the following: new Dataset(data.sparkSession, data.queryExecution, implicitly[Encoder[T]]) This is great. If reviews decide that 2 rounds of physical planning is acceptable, then let's do it your way! > ... ConsoleSink ... but it is only for Debug purposes So maybe let us live with it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14214: [SPARK-16545][SQL] Eliminate one unnecessary roun...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/14214 [SPARK-16545][SQL] Eliminate one unnecessary round of physical planning in ForeachSink ## Problem As reported by [SPARK-16545](https://issues.apache.org/jira/browse/SPARK-16545), in `ForeachSink` we have initialized 3 rounds of physical planning. Specifically: [1] In `StreamExecution`, [lastExecution.executedPlan](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L369) [2] In `ForeachSink`, [forearchPartition()](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala#L69) calls withNewExecutionId(..., **_queryExection_**) which further calls [**_queryExecution_**.executedPlan](https://github.com/apache/spark/blob/9a5071996b968148f6b9aba12e0d3fe888d9acd8/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala#L55) [3] In `ForeachSink`, [val rdd = { ... incrementalExecution = new IncrementalExecution ...}](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala#L53) ## What changes were proposed in this pull request? [1] should not be eliminated in general; **[2] is eliminated by this patch, by replacing the `queryExecution` with `incrementalExecution` provided by [3];** [3] should be eliminated but can not be done at this stage; let's revisit it when SPARK-16264 is resolved. ## How was this patch tested? - checked manually now there are only 2 rounds of physical planning in ForeachSink after this patch - existing tests ensues it cause no regression You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark physical-3x Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14214.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14214 commit 8ec635fe7403baf5149e3f6714872bf706b37cd7 Author: Liwei Lin <lwl...@gmail.com> Date: 2016-07-15T02:12:02Z Fix foreachPartition --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14165: [SPARK-16503] SparkSession should provide Spark version
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14165 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14165: [SPARK-16503] SparkSession should provide Spark v...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14165#discussion_r70570816 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala --- @@ -79,6 +79,9 @@ class SparkSession private( sparkContext.assertNotStopped() + /** The version of Spark on which this application is running. */ + def version: String = SPARK_VERSION --- End diff -- fixing this; thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14165: [SPARK-16503] SparkSession should provide Spark v...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/14165 [SPARK-16503] SparkSession should provide Spark version ## What changes were proposed in this pull request? This patch adds the following to SparkSession: ```scala /** The version of Spark on which this application is running. */ def version: String = SPARK_VERSION ``` ## How was this patch tested? Manual test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark add-version Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14165.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14165 commit 4309100777c7fd6f1fbd9081a49605fc0f8b1ff2 Author: Liwei Lin <lwl...@gmail.com> Date: 2016-07-12T23:51:35Z add version --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14116: [SPARK-16452][SQL] Support basic INFORMATION_SCHE...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14116#discussion_r70243938 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/systemcatalog/InformationSchema.scala --- @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.systemcatalog + +import java.sql.{Date, Timestamp} + +import scala.collection.mutable.ArrayBuffer + +import org.apache.commons.lang3.StringUtils + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +/** + * INFORMATION_SCHEMA is a database consisting views which provide information about all of the + * tables, views, columns in a database. + */ +object InformationSchema { + val INFORMATION_SCHEMA = "information_schema" + /** + * Register INFORMATION_SCHEMA database. + */ + def registerInformationSchema(sparkSession: SparkSession): Unit = { +sparkSession.sql(s"CREATE DATABASE IF NOT EXISTS $INFORMATION_SCHEMA") +registerView(sparkSession, new DatabasesRelationProvider, Seq("schemata", "databases")) +registerView(sparkSession, new TablesRelationProvider, Seq("tables")) +registerView(sparkSession, new ViewsRelationProvider, Seq("views")) +registerView(sparkSession, new ColumnsRelationProvider, Seq("columns")) +registerView(sparkSession, new SessionVariablesRelationProvider, Seq("session_variables")) + } + + /** + * Register a INFORMATION_SCHEMA relation provider as a temporary view of Spark Catalog. + */ + private def registerView( + sparkSession: SparkSession, + relationProvider: SchemaRelationProvider, + names: Seq[String]) { +val plan = + LogicalRelation(relationProvider.createRelation(sparkSession.sqlContext, null, null)).analyze +val projectList = plan.output.zip(plan.schema).map { + case (attr, col) => Alias(attr, col.name)() +} +sparkSession.sessionState.executePlan(Project(projectList, plan)) +for (name <- names) { + // TODO(dongjoon): This is a hack to give a database concept for Spark temporary views. + // We should generalize this later. + sparkSession.sessionState.catalog.createTempView(s"$INFORMATION_SCHEMA.$name", +plan, overrideIfExists = true) +} + } + + /** + * Compile filter array into single string condition. + */ + private[systemcatalog] def getConditionExpressionString(filters: Array[Filter]): String = { +val str = filters.flatMap(InformationSchema.compileFilter).map(p => s"($p)").mkString(" AND ") +if (str.length == 0) "TRUE" else str + } + + /** + * Convert filter into string expression. + */ + private[systemcatalog] def compileFilter(f: Filter): Option[String] = { --- End diff -- @liancheng I was thinking about https://github.com/apache/spark/pull/10541,which also another story :-) Thanks for the information! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14116: [SPARK-16452][SQL] Support basic INFORMATION_SCHE...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14116#discussion_r70218352 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/systemcatalog/InformationSchema.scala --- @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.systemcatalog + +import java.sql.{Date, Timestamp} + +import scala.collection.mutable.ArrayBuffer + +import org.apache.commons.lang3.StringUtils + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +/** + * INFORMATION_SCHEMA is a database consisting views which provide information about all of the + * tables, views, columns in a database. + */ +object InformationSchema { + val INFORMATION_SCHEMA = "information_schema" + /** + * Register INFORMATION_SCHEMA database. + */ + def registerInformationSchema(sparkSession: SparkSession): Unit = { +sparkSession.sql(s"CREATE DATABASE IF NOT EXISTS $INFORMATION_SCHEMA") +registerView(sparkSession, new DatabasesRelationProvider, Seq("schemata", "databases")) +registerView(sparkSession, new TablesRelationProvider, Seq("tables")) +registerView(sparkSession, new ViewsRelationProvider, Seq("views")) +registerView(sparkSession, new ColumnsRelationProvider, Seq("columns")) +registerView(sparkSession, new SessionVariablesRelationProvider, Seq("session_variables")) + } + + /** + * Register a INFORMATION_SCHEMA relation provider as a temporary view of Spark Catalog. + */ + private def registerView( + sparkSession: SparkSession, + relationProvider: SchemaRelationProvider, + names: Seq[String]) { +val plan = + LogicalRelation(relationProvider.createRelation(sparkSession.sqlContext, null, null)).analyze +val projectList = plan.output.zip(plan.schema).map { + case (attr, col) => Alias(attr, col.name)() +} +sparkSession.sessionState.executePlan(Project(projectList, plan)) +for (name <- names) { + // TODO(dongjoon): This is a hack to give a database concept for Spark temporary views. + // We should generalize this later. + sparkSession.sessionState.catalog.createTempView(s"$INFORMATION_SCHEMA.$name", +plan, overrideIfExists = true) +} + } + + /** + * Compile filter array into single string condition. + */ + private[systemcatalog] def getConditionExpressionString(filters: Array[Filter]): String = { +val str = filters.flatMap(InformationSchema.compileFilter).map(p => s"($p)").mkString(" AND ") +if (str.length == 0) "TRUE" else str + } + + /** + * Convert filter into string expression. + */ + private[systemcatalog] def compileFilter(f: Filter): Option[String] = { --- End diff -- Oh I see. Then if we won't dedup this for now, let's leave some comments saying if we should change some piece of code please don't forget to change the other? What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14116: [SPARK-16452][SQL] Support basic INFORMATION_SCHE...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14116#discussion_r70212106 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/systemcatalog/InformationSchema.scala --- @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.systemcatalog + +import java.sql.{Date, Timestamp} + +import scala.collection.mutable.ArrayBuffer + +import org.apache.commons.lang3.StringUtils + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +/** + * INFORMATION_SCHEMA is a database consisting views which provide information about all of the + * tables, views, columns in a database. + */ +object InformationSchema { + val INFORMATION_SCHEMA = "information_schema" + /** + * Register INFORMATION_SCHEMA database. + */ + def registerInformationSchema(sparkSession: SparkSession): Unit = { +sparkSession.sql(s"CREATE DATABASE IF NOT EXISTS $INFORMATION_SCHEMA") +registerView(sparkSession, new DatabasesRelationProvider, Seq("schemata", "databases")) +registerView(sparkSession, new TablesRelationProvider, Seq("tables")) +registerView(sparkSession, new ViewsRelationProvider, Seq("views")) +registerView(sparkSession, new ColumnsRelationProvider, Seq("columns")) +registerView(sparkSession, new SessionVariablesRelationProvider, Seq("session_variables")) + } + + /** + * Register a INFORMATION_SCHEMA relation provider as a temporary view of Spark Catalog. + */ + private def registerView( + sparkSession: SparkSession, + relationProvider: SchemaRelationProvider, + names: Seq[String]) { +val plan = + LogicalRelation(relationProvider.createRelation(sparkSession.sqlContext, null, null)).analyze +val projectList = plan.output.zip(plan.schema).map { + case (attr, col) => Alias(attr, col.name)() +} +sparkSession.sessionState.executePlan(Project(projectList, plan)) +for (name <- names) { + // TODO(dongjoon): This is a hack to give a database concept for Spark temporary views. + // We should generalize this later. + sparkSession.sessionState.catalog.createTempView(s"$INFORMATION_SCHEMA.$name", +plan, overrideIfExists = true) +} + } + + /** + * Compile filter array into single string condition. + */ + private[systemcatalog] def getConditionExpressionString(filters: Array[Filter]): String = { +val str = filters.flatMap(InformationSchema.compileFilter).map(p => s"($p)").mkString(" AND ") +if (str.length == 0) "TRUE" else str + } + + /** + * Convert filter into string expression. + */ + private[systemcatalog] def compileFilter(f: Filter): Option[String] = { --- End diff -- This whole function do have great merit, but I feel @liancheng had implemented something similar before? @liancheng could you confirm? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14116: [SPARK-16452][SQL] Support basic INFORMATION_SCHE...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14116#discussion_r70210589 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala --- @@ -401,7 +401,9 @@ class SessionCatalog( val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) val relation = -if (name.database.isDefined || !tempTables.contains(table)) { +if (db == "information_schema") { + tempTables(s"$db.$table") --- End diff -- Then I guess the constant `InformationSchema.INFORMATION_SCHEMA` should live in `sql/catalyst` rather than in `sql/core`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14116: [SPARK-16452][SQL] Support basic INFORMATION_SCHE...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14116#discussion_r70209020 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/systemcatalog/InformationSchema.scala --- @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.systemcatalog + +import java.sql.{Date, Timestamp} + +import scala.collection.mutable.ArrayBuffer + +import org.apache.commons.lang3.StringUtils + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +/** + * INFORMATION_SCHEMA is a database consisting views which provide information about all of the + * tables, views, columns in a database. + */ +object InformationSchema { + var INFORMATION_SCHEMA = "information_schema" + /** + * Register INFORMATION_SCHEMA database. + */ + def registerInformationSchema(sparkSession: SparkSession) { --- End diff -- `def registerInformationSchema(sparkSession: SparkSession): Unit = {` maybe? Public methods usually should have explicit return types. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14116: [SPARK-16452][SQL] Support basic INFORMATION_SCHE...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14116#discussion_r70208643 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/systemcatalog/InformationSchema.scala --- @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.systemcatalog + +import java.sql.{Date, Timestamp} + +import scala.collection.mutable.ArrayBuffer + +import org.apache.commons.lang3.StringUtils + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +/** + * INFORMATION_SCHEMA is a database consisting views which provide information about all of the + * tables, views, columns in a database. + */ +object InformationSchema { + var INFORMATION_SCHEMA = "information_schema" + /** + * Register INFORMATION_SCHEMA database. + */ + def registerInformationSchema(sparkSession: SparkSession) { +sparkSession.sql("CREATE DATABASE IF NOT EXISTS information_schema") --- End diff -- `s"CREATE DATABASE IF NOT EXISTS ${INFORMATION_SCHEMA}"` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14116: [SPARK-16452][SQL] Support basic INFORMATION_SCHE...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14116#discussion_r70208504 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala --- @@ -425,7 +427,9 @@ class SessionCatalog( def tableExists(name: TableIdentifier): Boolean = synchronized { val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) -if (name.database.isDefined || !tempTables.contains(table)) { +if (db == "information_schema") { --- End diff -- same here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14116: [SPARK-16452][SQL] Support basic INFORMATION_SCHE...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14116#discussion_r70208490 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala --- @@ -401,7 +401,9 @@ class SessionCatalog( val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) val relation = -if (name.database.isDefined || !tempTables.contains(table)) { +if (db == "information_schema") { + tempTables(s"$db.$table") --- End diff -- `if (db == InformationSchema.INFORMATION_SCHEMA)` maybe? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14116: [SPARK-16452][SQL] Support basic INFORMATION_SCHE...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14116#discussion_r70208420 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/systemcatalog/InformationSchema.scala --- @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.systemcatalog + +import java.sql.{Date, Timestamp} + +import scala.collection.mutable.ArrayBuffer + +import org.apache.commons.lang3.StringUtils + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +/** + * INFORMATION_SCHEMA is a database consisting views which provide information about all of the + * tables, views, columns in a database. + */ +object InformationSchema { + var INFORMATION_SCHEMA = "information_schema" --- End diff -- `val` maybe? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14118: [SPARK-16462][SPARK-16460][SPARK-15144][SQL] Make CSV ca...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14118 I think @HyukjinKwon has made a good point: it's kind of strange null strings can be written out, but can not be read back as nulls. So for `StringType`: nulls write & read consistent with 1.6? option (a) null strings can be written out,but can NOT be read back as nulls yes option (b) null strings can be written out, and can be read back as nulls NO @HyukjinKwon and I are somewhat inclined to option(b) because it sounds reasonable to end users. @rxin would you make a final decision? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14118: [SPARK-16462][SPARK-16460][SQL] Make CSV cast null value...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14118 @HyukjinKwon hi. The explanation above intends to help reviewers better understand how we introduced the regression. Regarding whether `StringType` should be ignored or not, I don't have strong preference :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14118: [SPARK-16462][SPARK-16460][SQL] Make CSV cast nul...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14118#discussion_r70182426 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala --- @@ -238,59 +238,55 @@ private[csv] object CSVTypeCast { nullable: Boolean = true, options: CSVOptions = CSVOptions()): Any = { -castType match { - case _: ByteType => if (datum == options.nullValue && nullable) null else datum.toByte - case _: ShortType => if (datum == options.nullValue && nullable) null else datum.toShort - case _: IntegerType => if (datum == options.nullValue && nullable) null else datum.toInt - case _: LongType => if (datum == options.nullValue && nullable) null else datum.toLong - case _: FloatType => -if (datum == options.nullValue && nullable) { - null -} else if (datum == options.nanValue) { - Float.NaN -} else if (datum == options.negativeInf) { - Float.NegativeInfinity -} else if (datum == options.positiveInf) { - Float.PositiveInfinity -} else { - Try(datum.toFloat) - .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue()) -} - case _: DoubleType => -if (datum == options.nullValue && nullable) { - null -} else if (datum == options.nanValue) { - Double.NaN -} else if (datum == options.negativeInf) { - Double.NegativeInfinity -} else if (datum == options.positiveInf) { - Double.PositiveInfinity -} else { - Try(datum.toDouble) - .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue()) -} - case _: BooleanType => datum.toBoolean - case dt: DecimalType => -if (datum == options.nullValue && nullable) { - null -} else { +if (datum == options.nullValue && nullable && (!castType.isInstanceOf[StringType])) { --- End diff -- > ... why StringType is excluded? Hi @HyukjinKwon, it's just to keep consistency with we did in `spark-csv` for 1.6. Actually I don't have strong preference here -- maybe we should not ignore `StringType`? @rxin could you share some thoughts? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14118: [SPARK-16462][SPARK-16460][SQL] Make CSV cast null value...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14118 FYI, before [SPARK-14143](https://issues.apache.org/jira/browse/SPARK-14143), null values had been handled this way: : ```scala if (datum == options.nullValue && nullable && (!castType.isInstanceOf[StringType])) ``` Then in [SPARK-14143](https://issues.apache.org/jira/browse/SPARK-14143), it was first broken down into numeric data types in https://github.com/apache/spark/pull/11947/commits/93ac6bb3eb63efb775b48af090a37a6cbe4f30c4 to handle byte-specific null value, short-specific null value, int-specific null value, ... : ```scala case _: ByteType => if (datum == params.byteNullValue && nullable) null else datum.toByte case _: ShortType => if (datum == params.shortNullValue && nullable) null else datum.toShort case _: IntegerType => if (datum == params.integerNullValue && nullable) null else datum.toInt ... ``` then in https://github.com/apache/spark/pull/11947/commits/698b4b41baa1ebd5d66ea6242bcb39bcd0887f8b byte-specific null value, short-specific null value, int-specific null value, ... were reduced back to one single null value: ```scala case _: ByteType => if (datum == params.nullValue && nullable) null else datum.toByte case _: ShortType => if (datum == params.nullValue && nullable) null else datum.toShort case _: IntegerType => if (datum == params.nullValue && nullable) null else datum.toInt ``` Along with that change, we had introduced regression handling non-numeric data types like `BooleanType` etc. Since we don't need to handle type-specific null values, this patch switchs back to the way we handled null values in the 1.6 days (and thus fixes the regression): ```scala if (datum == options.nullValue && nullable && (!castType.isInstanceOf[StringType])) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14118: [SPARK-16462][SPARK-16460][SQL] Make CSV cast null value...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14118 The diff that github shows is a mess. The actual diff (which is quite small) is: ![diff](https://cloud.githubusercontent.com/assets/15843379/16711624/db6faf94-4697-11e6-8c56-53f10711aea5.png) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14118: [SPARK-16462][SPARK-16460][SQL] Make CSV cast nul...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/14118 [SPARK-16462][SPARK-16460][SQL] Make CSV cast null values properly ## What changes were proposed in this pull request? When casting given string datum to specified type, CSV should return `null` for nullable types if `datum == options.nullValue`. However, for certain data types like `Boolean`, `TimestampType`, `DateType`, CSV in 2.0 does not return `null` for some "empty" datum. This is a regression comparing to 1.6. This patch fixes this. ## How was this patch tested? New test cases. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark csv-cast-null Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14118.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14118 commit e782616498bcfc50398c2b560c3adf1512099d4f Author: Liwei Lin <lwl...@gmail.com> Date: 2016-07-09T13:39:50Z cast null correctly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14030: [SPARK-16350][SQL] Fix support for incremental planning ...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14030 Updated. @zsxwing could you take another look? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14030: [SPARK-16350][SQL] Fix support for incremental pl...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14030#discussion_r69856084 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -30,7 +32,42 @@ import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { override def addBatch(batchId: Long, data: DataFrame): Unit = { -data.as[T].foreachPartition { iter => +// TODO: Refine this method when SPARK-16264 is resolved; see comments below. --- End diff -- The logical plan of the created `IncrementalExecution` has to be `deserialized`[#L55](https://github.com/apache/spark/pull/14030/files#diff-98acda846a9dd63efc42e0957594e05dR55), so we should not re-use the `QueryExecution` passed in? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14030: [SPARK-16350][SQL] Fix support for incremental pl...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14030#discussion_r69855749 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala --- @@ -35,35 +35,109 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf sqlContext.streams.active.foreach(_.stop()) } - test("foreach") { + test("foreach() with `append` output mode") { withTempDir { checkpointDir => val input = MemoryStream[Int] val query = input.toDS().repartition(2).writeStream .option("checkpointLocation", checkpointDir.getCanonicalPath) +.outputMode("append") .foreach(new TestForeachWriter()) .start() + + // -- batch 0 --- input.addData(1, 2, 3, 4) query.processAllAvailable() - val expectedEventsForPartition0 = Seq( + var expectedEventsForPartition0 = Seq( ForeachSinkSuite.Open(partition = 0, version = 0), ForeachSinkSuite.Process(value = 1), ForeachSinkSuite.Process(value = 3), ForeachSinkSuite.Close(None) ) - val expectedEventsForPartition1 = Seq( + var expectedEventsForPartition1 = Seq( ForeachSinkSuite.Open(partition = 1, version = 0), ForeachSinkSuite.Process(value = 2), ForeachSinkSuite.Process(value = 4), ForeachSinkSuite.Close(None) ) - val allEvents = ForeachSinkSuite.allEvents() + var allEvents = ForeachSinkSuite.allEvents() + assert(allEvents.size === 2) + assert { +allEvents === Seq(expectedEventsForPartition0, expectedEventsForPartition1) || + allEvents === Seq(expectedEventsForPartition1, expectedEventsForPartition0) + } + + ForeachSinkSuite.clear() + + // -- batch 1 --- + input.addData(5, 6, 7, 8) + query.processAllAvailable() + + expectedEventsForPartition0 = Seq( +ForeachSinkSuite.Open(partition = 0, version = 1), +ForeachSinkSuite.Process(value = 5), +ForeachSinkSuite.Process(value = 7), +ForeachSinkSuite.Close(None) + ) + expectedEventsForPartition1 = Seq( +ForeachSinkSuite.Open(partition = 1, version = 1), +ForeachSinkSuite.Process(value = 6), +ForeachSinkSuite.Process(value = 8), +ForeachSinkSuite.Close(None) + ) + + allEvents = ForeachSinkSuite.allEvents() assert(allEvents.size === 2) assert { allEvents === Seq(expectedEventsForPartition0, expectedEventsForPartition1) || allEvents === Seq(expectedEventsForPartition1, expectedEventsForPartition0) } + + query.stop() +} + } + + test("foreach() with `complete` output mode") { +withTempDir { checkpointDir => + val input = MemoryStream[Int] + + val query = input.toDS() +.groupBy().count().as[Long].map(_.toInt) +.writeStream +.option("checkpointLocation", checkpointDir.getCanonicalPath) +.outputMode("complete") --- End diff -- fixed; thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14030: [SPARK-16350][SQL] Fix support for incremental pl...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14030#discussion_r69855731 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala --- @@ -35,35 +35,109 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf sqlContext.streams.active.foreach(_.stop()) } - test("foreach") { + test("foreach() with `append` output mode") { withTempDir { checkpointDir => val input = MemoryStream[Int] val query = input.toDS().repartition(2).writeStream .option("checkpointLocation", checkpointDir.getCanonicalPath) +.outputMode("append") .foreach(new TestForeachWriter()) .start() + + // -- batch 0 --- input.addData(1, 2, 3, 4) query.processAllAvailable() - val expectedEventsForPartition0 = Seq( + var expectedEventsForPartition0 = Seq( ForeachSinkSuite.Open(partition = 0, version = 0), ForeachSinkSuite.Process(value = 1), ForeachSinkSuite.Process(value = 3), ForeachSinkSuite.Close(None) ) - val expectedEventsForPartition1 = Seq( + var expectedEventsForPartition1 = Seq( ForeachSinkSuite.Open(partition = 1, version = 0), ForeachSinkSuite.Process(value = 2), ForeachSinkSuite.Process(value = 4), ForeachSinkSuite.Close(None) ) - val allEvents = ForeachSinkSuite.allEvents() + var allEvents = ForeachSinkSuite.allEvents() + assert(allEvents.size === 2) + assert { +allEvents === Seq(expectedEventsForPartition0, expectedEventsForPartition1) || + allEvents === Seq(expectedEventsForPartition1, expectedEventsForPartition0) --- End diff -- fixed. @jaceklaskowski @zsxwing thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14030: [SPARK-16350][SQL] Fix support for incremental pl...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14030#discussion_r69855673 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -155,7 +155,7 @@ private[sql] object Dataset { class Dataset[T] private[sql]( @transient val sparkSession: SparkSession, @DeveloperApi @transient val queryExecution: QueryExecution, -encoder: Encoder[T]) +val encoder: Encoder[T]) --- End diff -- this change had been reverted; thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14030: [SPARK-16350][SQL] Fix support for incremental pl...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/14030#discussion_r69855694 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -30,7 +32,42 @@ import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { override def addBatch(batchId: Long, data: DataFrame): Unit = { -data.as[T].foreachPartition { iter => +// TODO: Refine this method when SPARK-16264 is resolved; see comments below. + +// This logic should've been as simple as: +// ``` +// data.as[T].foreachPartition { iter => ... } +// ``` +// +// Unfortunately, doing that would just break the incremental planing. The reason is, +// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` just +// does not support `IncrementalExecution`. +// +// So as a provisional fix, below we've made a special version of `Dataset` with its `rdd()` +// method supporting incremental planning. But in the long run, we should generally make newly +// created Datasets use `IncrementalExecution` where necessary (which is SPARK-16264 tries to +// resolve). + +val dataAsT = data.as[T] +val datasetWithIncrementalExecution = + new Dataset(data.sparkSession, dataAsT.logicalPlan, dataAsT.encoder) { --- End diff -- fixed; thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14030: [SPARK-16350][SQL] Fix support for incremental planning ...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14030 @zsxwing could you take a look at this when you have time? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13804: [Minor][Core] Fix display wrong free memory size in the ...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/13804 hi @jerryshao, let's also back-port this into 1.6.x ([MemoryStore.scala#L395](https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L395)) maybe? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14030: [SPARK-16350][SQL] Fix support for incremental planning ...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14030 @zsxwing could you take a look at this? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14030: [SPARK-16350][SQL] Fix support for incremental planning ...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14030 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14030: [WIP][SPARK-16350][SQL] Fix `foreach` for streaming Data...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/14030 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14030: [WIP][SPARK-16350][SQL] Fix `foreach` for streami...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/14030 [WIP][SPARK-16350][SQL] Fix `foreach` for streaming Dataset ## What changes were proposed in this pull request? - [x] add tests - [ ] fix `foreach` ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark fix-foreach-complete Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14030.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14030 commit f3f60f919a2070a6946d0d908b54225d3c2263fc Author: Liwei Lin <lwl...@gmail.com> Date: 2016-07-02T14:56:06Z Add test(`complete`) & expand test(`append`) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13978: [SPARK-16256][DOCS] Minor fixes on the Structured Stream...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/13978 The programming guide is totally awesome, thanks @tdas! Seems like there is one minor issue: we should also count this `12:11 dog` into window `12:05-12:15`, right? ![ssx](https://cloud.githubusercontent.com/assets/15843379/16483429/f2c49438-3ee6-11e6-9ac5-bcb3b38a9842.png) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13685: [SPARK-15963][CORE] Catch `TaskKilledException` correctl...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/13685 Thanks, @squito @markhamstra ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13685: [SPARK-15963][CORE] Catch `TaskKilledException` correctl...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/13685 Addressed all comments. @squito would you take another look? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13685: [SPARK-15963][CORE] Catch `TaskKilledException` c...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/13685#discussion_r68165895 --- Diff: core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.nio.ByteBuffer +import java.util.concurrent.CountDownLatch + +import scala.collection.mutable.HashMap + +import org.mockito.Matchers._ +import org.mockito.Mockito.{mock, when} +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer + +import org.apache.spark._ +import org.apache.spark.TaskState.TaskState +import org.apache.spark.memory.MemoryManager +import org.apache.spark.rpc.RpcEnv +import org.apache.spark.scheduler.{FakeTask, Task} +import org.apache.spark.serializer.JavaSerializer + +class ExecutorSuite extends SparkFunSuite { + + test("SPARK-15963: Catch `TaskKilledException` correctly in Executor.TaskRunner") { +// mock some objects to make Executor.launchTask() happy +val conf = new SparkConf +val serializer = new JavaSerializer(conf) +val mockEnv = mock(classOf[SparkEnv]) +val mockRpcEnv = mock(classOf[RpcEnv]) +val mockMemoryManager = mock(classOf[MemoryManager]) +when(mockEnv.conf).thenReturn(conf) +when(mockEnv.serializer).thenReturn(serializer) +when(mockEnv.rpcEnv).thenReturn(mockRpcEnv) +when(mockEnv.memoryManager).thenReturn(mockMemoryManager) +when(mockEnv.closureSerializer).thenReturn(serializer) +val serializedTask = + Task.serializeWithDependencies( +new FakeTask(0), +HashMap[String, Long](), +HashMap[String, Long](), +serializer.newInstance()) + +// the program should run in this order: +// +-+--+ +// | main test thread | worker thread | +// +-+--+ +// |executor.launchTask()| | +// | | TaskRunner.run() begins | +// | | ... | +// | | execBackend.statusUpdate // 1st time, #L240 | +// | executor.killAllTasks(true) | | +// | | ... | +// | | task = ser.deserialize // #L253 | +// | | ... | +// | | execBackend.statusUpdate // 2nd time, #L365 | +// | | ... | +// | | TaskRunner.run() ends | +// | check results | | +// +-+--+ + +val mockExecutorBackend = mock(classOf[ExecutorBackend]) +when(mockExecutorBackend.statusUpdate(any(), any(), any())) + .thenAnswer(new Answer[Unit] { +var firstTime = true +override def answer(invocationOnMock: InvocationOnMock): Unit = { + if (firstTime) { +TestHelper.latch1.countDown() +// here between latch1 and latch2, executor.killAllTasks() is called +TestHelper.latch2.await() +firstTime = false + } + else { +val taskState = invocationOnMock.getArguments()(1).asInstanceOf[TaskState] +/
[GitHub] spark issue #13685: [SPARK-15963][CORE] Catch `TaskKilledException` correctl...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/13685 Hi @squito thanks for the comments! > how you'd have a TaskKilledException, but without setting the task to `killed` This can be reproduced when, a task gets killed([Executor#L235~L252](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L235~L252)) before it can be deserialized([Executor#L253](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L253)). Then in [TaskRunner.kill()](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L211), `task` is still `null`, so only `taskRunner.killed` is set to `true` but not `task.killed`. The updated test reproduces this. In the test, we use `mockExecutorBackend` to trigger `executor.killAllTasks(true)` at Executor#L240; the program should run in this order: ``` +-+--+ | main test thread | worker thread | +-+--+ |executor.launchTask()| | | | TaskRunner.run() begins | | | ... | | | execBackend.statusUpdate // 1st time, #L240 | | executor.killAllTasks(true) | | | | ... | | | task = ser.deserialize // #L253 | | | ... | | | execBackend.statusUpdate // 2nd time, #L365 | | | ... | | | TaskRunner.run() ends | | check results | | +-+--+ ``` Then: prior to this patch after this patch testFailedReason ExceptionFailure TaskKilled taskState FAILED KILLED --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13652: [SPARK-15613] [SQL] Fix incorrect days to millis convers...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/13652 hi @davies, two tests still fail after this patch when I build locally: ``` - from UTC timestamp *** FAILED *** "2016-03-13 0[2]:00:00.0" did not equal "2016-03-13 0[3]:00:00.0" (DateTimeUtilsSuite.scala:488) - to UTC timestamp *** FAILED *** "2016-03-13 1[1]:00:00.0" did not equal "2016-03-13 1[0]:00:00.0" (DateTimeUtilsSuite.scala:506) ``` My time zone: ``` admin$ sudo systemsetup -gettimezone Time Zone: Asia/Shanghai ``` Can you take a look? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13518: [WIP][SPARK-15472][SQL] Add support for writing i...
Github user lw-lin closed the pull request at: https://github.com/apache/spark/pull/13518 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13705: [SPARK-15472][SQL] Add support for writing in `csv` form...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/13705 I personally feel that it would be great if we can also support writing in `csv`, `json`, `txt` formats in Structured Streaming for the 2.0 release (I'd like to submit patches for `json`, `txt` very soon if possible). @marmbrus @tdas @zsxwing please let me know what do you think and if you have any ideas to improve this patch, thanks a lot! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13705: [SPARK-15472][SQL] Add support for writing in `cs...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/13705 [SPARK-15472][SQL] Add support for writing in `csv` format in Structured Streaming ## What changes were proposed in this pull request? This patch adds support for writing in `csv` format in Structured Streaming: **1. At a high level, this patch forms the following class hierarchy**: ``` â CSVOutputWriterBase â â (anonymous batch) CSVOutputWriter(anonymous streaming) CSVOutputWriter [write data without using an OutputCommitter] ``` ``` â â BatchCSVOutputWriterFactory StreamingCSVOutputWriterFactory ``` The streaming CSVOutputWriter would write data **without** using an `OutputCommitter`, which was the same approach taken by [SPARK-14716](https://github.com/apache/spark/pull/12409). **2. To support compression, this patch attaches an extension to the path assigned by `FileStreamSink`**. E.g., if we write out using the `gzip` compression and `FileStreamSink` assigns path `${uuid}` to the output writer, then in the end the file written out will be `${uuid}.csv.gz`. This way when we read the file back, we should be able to interpret it correctly as `gzip` compressed. This is slightly different from [SPARK-14716](https://github.com/apache/spark/pull/12409). ## How was this patch tested? `FileStreamSinkSuite` is expanded to cover `csv` format: ```scala test("csv - unpartitioned data - codecs: none/gzip") test("csv - partitioned data - codecs: none/gzip") test("csv - unpartitioned writing and batch reading - codecs: none/gzip") test("csv - partitioned writing and batch reading - codecs: none/gzip") ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark csv-for-ss Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/13705.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13705 commit 9869f9885e4fdc7364cd46ab05b1f332921ff8d7 Author: Liwei Lin <lwl...@gmail.com> Date: 2016-06-16T05:38:13Z Add support for writing in `csv` format --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13518: [WIP][SPARK-15472][SQL] Add support for writing in `csv`...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/13518 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13518: [WIP][SPARK-15472][SQL] Add support for writing in `csv`...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/13518 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13518: [WIP][SPARK-15472][SQL] Add support for writing i...
GitHub user lw-lin reopened a pull request: https://github.com/apache/spark/pull/13518 [WIP][SPARK-15472][SQL] Add support for writing in `csv`, `json`, `text` formats in Structured Streaming ## What changes were proposed in this pull request? This patch adds support for writing in `csv`, `json`, `text` formats in Structured Streaming: **1. at a high level, this patch forms the following hierarchy**(`text` as an example): ``` â TextOutputWriterBase â â BatchTextOutputWriter StreamingTextOutputWriter ``` ``` â â BatchTextOutputWriterFactory StreamingOutputWriterFactory â StreamingTextOutputWriterFactory ``` The `StreamingTextOutputWriter` and other 'streaming' output writers would write data **without** using an `OutputCommitter`. This was the same approach taken by [SPARK-14716](https://github.com/apache/spark/pull/12409). **2. to support compression, this patch attaches an extension to the path assigned by `FileStreamSink`**, which is slightly different from [SPARK-14716](https://github.com/apache/spark/pull/12409). For example, if we are writing out using the `gzip` compression and `FileStreamSink` assigns path `${uuid}` to a text writer, then in the end the file written out will be `${uuid}.txt.gz` -- so that when we read the file back, we'll correctly interpret it as `gzip` compressed. ## How was this patch tested? `FileStreamSinkSuite` is expanded much more to cover the added `csv`, `json`, `text` formats: ```scala test(" csv - unpartitioned data - codecs: none/gzip") test("json - unpartitioned data - codecs: none/gzip") test("text - unpartitioned data - codecs: none/gzip") test(" csv - partitioned data - codecs: none/gzip") test("json - partitioned data - codecs: none/gzip") test("text - partitioned data - codecs: none/gzip") test(" csv - unpartitioned writing and batch reading - codecs: none/gzip") test("json - unpartitioned writing and batch reading - codecs: none/gzip") test("text - unpartitioned writing and batch reading - codecs: none/gzip") test(" csv - partitioned writing and batch reading - codecs: none/gzip") test("json - partitioned writing and batch reading - codecs: none/gzip") test("text - partitioned writing and batch reading - codecs: none/gzip") ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark add-csv-json-text-for-ss Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/13518.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13518 commit c70083e9f76c20f6bf48e7ec821452f9bf63783a Author: Liwei Lin <lwl...@gmail.com> Date: 2016-06-05T09:03:04Z Add csv, json, text commit bc28f4112ca9eca6a9f1602a891dd0388fa3185c Author: Liwei Lin <lwl...@gmail.com> Date: 2016-06-09T03:31:59Z Fix parquet extension --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13575: [SPARK-15472][SQL] Add support for writing in `cs...
Github user lw-lin closed the pull request at: https://github.com/apache/spark/pull/13575 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13685: [SPARK-15963][CORE] Catch `TaskKilledException` correctl...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/13685 I couldn't come up with good syntax to express something like ```scala case e @ (_: TaskKilledException) | (_: InterruptedException if task.killed) => ... ``` So this patch simply breaks the case into two separate ones. @kayousterhout @markhamstra @squito, would you mind taking a look? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13685: [SPARK-15963][CORE] Catch `TaskKilledException` c...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/13685 [SPARK-15963][CORE] Catch `TaskKilledException` correctly in Executor.TaskRunner ## What changes were proposed in this pull request? Currently in [Executor.TaskRunner](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L362), we: ``` scala try {...} catch { case _: TaskKilledException | _: InterruptedException if task.killed => ... } ``` What we intended was: - `TaskKilledException` **OR** **(**`InterruptedException` **AND** `task.killed`**)** But fact is: - **(**`TaskKilledException` **OR** `InterruptedException`**)** **AND** `task.killed` As a consequence, sometimes we can not catch `TaskKilledException` and will incorrectly report our task status as `FAILED` (which should really be `KILLED`). This patch fixes this. ## How was this patch tested? This should be easy to reason (also we don't have any test case for TaskRunner yet?) You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark fix-task-killed Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/13685.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13685 commit 777959e6a6b9a2e21a32aec6b0bd6850d6513474 Author: Liwei Lin <lwl...@gmail.com> Date: 2016-06-15T14:09:22Z Break the case killed and interrupted into two --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13683: [SPARK-15518][Core][Follow-up] Rename LocalSchedulerBack...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/13683 @rxin would you mind taking a look? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13683: [SPARK-15518][Core][Follow-up] Rename LocalSchedu...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/13683 [SPARK-15518][Core][Follow-up] Rename LocalSchedulerBackendEndpoint -> LocalSchedulerBackend ## What changes were proposed in this pull request? This patch is a follow-up to https://github.com/apache/spark/pull/13288 completing the renaming `LocalScheduler` -> `LocalSchedulerBackend`. ## How was this patch tested? Updated test cases to reflect the name change. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark rename-backend Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/13683.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13683 commit 9b520ae581653db93512e1a096095c83ccb7157a Author: Liwei Lin <lwl...@gmail.com> Date: 2016-06-15T08:48:18Z Make local scheduler backend class name consistent --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11996: [SPARK-10530] [CORE] Kill other task attempts when one t...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/11996 @devaraj-kavali @kayousterhout this is good to have, but I just wonder if this would cause resources to leak? E.g when the task is in the middle of releasing resources in a `finally` block -- like [Executor.scala#L281](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L281) -- then it gets killed and interrupted? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org