[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...
Github user kevinyu98 commented on a diff in the pull request: https://github.com/apache/spark/pull/12646#discussion_r139279542 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -503,69 +504,307 @@ case class FindInSet(left: Expression, right: Expression) extends BinaryExpressi override def prettyName: String = "find_in_set" } +trait String2TrimExpression extends Expression with ImplicitCastInputTypes { + + override def dataType: DataType = StringType + override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.size)(StringType) + + override def nullable: Boolean = children.exists(_.nullable) + override def foldable: Boolean = children.forall(_.foldable) +} + +object StringTrim { + def apply(str: Expression, trimStr: Expression) : StringTrim = StringTrim(str, Some(trimStr)) + def apply(str: Expression) : StringTrim = StringTrim(str, None) +} + /** - * A function that trim the spaces from both ends for the specified string. + * A function that takes a character string, removes the leading and trailing characters matching with the characters + * in the trim string, returns the new string. + * If BOTH and trimStr keywords are not specified, it defaults to remove space character from both ends. The trim + * function will have one argument, which contains the source string. + * If BOTH and trimStr keywords are specified, it trims the characters from both ends, and the trim function will have + * two arguments, the first argument contains trimStr, the second argument contains the source string. + * trimStr: A character string to be trimmed from the source string, if it has multiple characters, the function + * searches for each character in the source string, removes the characters from the source string until it + * encounters the first non-match character. + * BOTH: removes any character from both ends of the source string that matches characters in the trim string. */ @ExpressionDescription( - usage = "_FUNC_(str) - Removes the leading and trailing space characters from `str`.", + usage = """ +_FUNC_(str) - Removes the leading and trailing space characters from `str`. +_FUNC_(BOTH trimStr FROM str) - Remove the leading and trailing trimString from `str` + """, + arguments = """ +Arguments: + * str - a string expression + * trimString - the trim string + * BOTH, FROM - these are keyword to specify for trim string from both ends of the string + """, examples = """ Examples: > SELECT _FUNC_('SparkSQL '); SparkSQL + > SELECT _FUNC_(BOTH 'SL' FROM 'SSparkSQLS'); + parkSQ """) -case class StringTrim(child: Expression) - extends UnaryExpression with String2StringExpression { +case class StringTrim( +srcStr: Expression, +trimStr: Option[Expression] = None) + extends String2TrimExpression { - def convert(v: UTF8String): UTF8String = v.trim() + def this (trimStr: Expression, srcStr: Expression) = this(srcStr, Option(trimStr)) + + def this(srcStr: Expression) = this(srcStr, None) override def prettyName: String = "trim" + override def children: Seq[Expression] = if (trimStr.isDefined) { +srcStr :: trimStr.get :: Nil + } else { +srcStr :: Nil + } + override def eval(input: InternalRow): Any = { +val srcString = srcStr.eval(input).asInstanceOf[UTF8String] +if (srcString == null) { + null +} else { + if (trimStr.isDefined) { +return srcString.trim(trimStr.get.eval(input).asInstanceOf[UTF8String]) + } else { +return srcString.trim() + } +} + } + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -defineCodeGen(ctx, ev, c => s"($c).trim()") +val evals = children.map(_.genCode(ctx)) +val srcString = evals(0) + +if (evals.length == 1) { + ev.copy(evals.map(_.code).mkString("\n") + s""" +boolean ${ev.isNull} = false; +UTF8String ${ev.value} = null; +if (${srcString.isNull}) { + ${ev.isNull} = true; +} else { + ${ev.value} = ${srcString.value}.trim(); +}""") +} else { + val trimString = evals(1) + val getTrimFunction = +s""" +if (${trimString.isNull}) { + ${ev.isNull} = true; +} else { + ${ev.value} = ${srcString.value}.trim(${trimString.value}); +}""" + ev.copy(evals.map(_.code).mkString("\n") + +s""" +boolean ${ev.isNull} = false; +UT
[GitHub] spark issue #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compression.co...
Github user fjh100456 commented on the issue: https://github.com/apache/spark/pull/19218 @dongjoon-hyun Thank you very much, I'll fix it now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...
Github user kevinyu98 commented on a diff in the pull request: https://github.com/apache/spark/pull/12646#discussion_r139278875 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -503,69 +504,307 @@ case class FindInSet(left: Expression, right: Expression) extends BinaryExpressi override def prettyName: String = "find_in_set" } +trait String2TrimExpression extends Expression with ImplicitCastInputTypes { + + override def dataType: DataType = StringType + override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.size)(StringType) + + override def nullable: Boolean = children.exists(_.nullable) + override def foldable: Boolean = children.forall(_.foldable) +} + +object StringTrim { + def apply(str: Expression, trimStr: Expression) : StringTrim = StringTrim(str, Some(trimStr)) + def apply(str: Expression) : StringTrim = StringTrim(str, None) +} + /** - * A function that trim the spaces from both ends for the specified string. + * A function that takes a character string, removes the leading and trailing characters matching with the characters + * in the trim string, returns the new string. + * If BOTH and trimStr keywords are not specified, it defaults to remove space character from both ends. The trim + * function will have one argument, which contains the source string. + * If BOTH and trimStr keywords are specified, it trims the characters from both ends, and the trim function will have + * two arguments, the first argument contains trimStr, the second argument contains the source string. + * trimStr: A character string to be trimmed from the source string, if it has multiple characters, the function + * searches for each character in the source string, removes the characters from the source string until it + * encounters the first non-match character. + * BOTH: removes any character from both ends of the source string that matches characters in the trim string. */ @ExpressionDescription( - usage = "_FUNC_(str) - Removes the leading and trailing space characters from `str`.", + usage = """ +_FUNC_(str) - Removes the leading and trailing space characters from `str`. +_FUNC_(BOTH trimStr FROM str) - Remove the leading and trailing trimString from `str` + """, + arguments = """ +Arguments: + * str - a string expression + * trimString - the trim string + * BOTH, FROM - these are keyword to specify for trim string from both ends of the string + """, examples = """ Examples: > SELECT _FUNC_('SparkSQL '); SparkSQL + > SELECT _FUNC_(BOTH 'SL' FROM 'SSparkSQLS'); + parkSQ """) -case class StringTrim(child: Expression) - extends UnaryExpression with String2StringExpression { +case class StringTrim( +srcStr: Expression, +trimStr: Option[Expression] = None) + extends String2TrimExpression { - def convert(v: UTF8String): UTF8String = v.trim() + def this (trimStr: Expression, srcStr: Expression) = this(srcStr, Option(trimStr)) + + def this(srcStr: Expression) = this(srcStr, None) override def prettyName: String = "trim" + override def children: Seq[Expression] = if (trimStr.isDefined) { +srcStr :: trimStr.get :: Nil + } else { +srcStr :: Nil + } + override def eval(input: InternalRow): Any = { +val srcString = srcStr.eval(input).asInstanceOf[UTF8String] +if (srcString == null) { + null +} else { + if (trimStr.isDefined) { +return srcString.trim(trimStr.get.eval(input).asInstanceOf[UTF8String]) + } else { +return srcString.trim() + } +} + } + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -defineCodeGen(ctx, ev, c => s"($c).trim()") +val evals = children.map(_.genCode(ctx)) +val srcString = evals(0) + +if (evals.length == 1) { + ev.copy(evals.map(_.code).mkString("\n") + s""" --- End diff -- I changed the generate code to ev.copy(evals.map(_.code).mkString + s""" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...
Github user kevinyu98 commented on a diff in the pull request: https://github.com/apache/spark/pull/12646#discussion_r139278847 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -503,69 +504,307 @@ case class FindInSet(left: Expression, right: Expression) extends BinaryExpressi override def prettyName: String = "find_in_set" } +trait String2TrimExpression extends Expression with ImplicitCastInputTypes { + + override def dataType: DataType = StringType + override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.size)(StringType) + + override def nullable: Boolean = children.exists(_.nullable) + override def foldable: Boolean = children.forall(_.foldable) +} + +object StringTrim { + def apply(str: Expression, trimStr: Expression) : StringTrim = StringTrim(str, Some(trimStr)) + def apply(str: Expression) : StringTrim = StringTrim(str, None) +} + /** - * A function that trim the spaces from both ends for the specified string. + * A function that takes a character string, removes the leading and trailing characters matching with the characters + * in the trim string, returns the new string. + * If BOTH and trimStr keywords are not specified, it defaults to remove space character from both ends. The trim + * function will have one argument, which contains the source string. + * If BOTH and trimStr keywords are specified, it trims the characters from both ends, and the trim function will have + * two arguments, the first argument contains trimStr, the second argument contains the source string. + * trimStr: A character string to be trimmed from the source string, if it has multiple characters, the function + * searches for each character in the source string, removes the characters from the source string until it + * encounters the first non-match character. + * BOTH: removes any character from both ends of the source string that matches characters in the trim string. */ @ExpressionDescription( - usage = "_FUNC_(str) - Removes the leading and trailing space characters from `str`.", + usage = """ +_FUNC_(str) - Removes the leading and trailing space characters from `str`. +_FUNC_(BOTH trimStr FROM str) - Remove the leading and trailing trimString from `str` + """, + arguments = """ +Arguments: + * str - a string expression + * trimString - the trim string + * BOTH, FROM - these are keyword to specify for trim string from both ends of the string + """, examples = """ Examples: > SELECT _FUNC_('SparkSQL '); SparkSQL + > SELECT _FUNC_(BOTH 'SL' FROM 'SSparkSQLS'); + parkSQ """) -case class StringTrim(child: Expression) - extends UnaryExpression with String2StringExpression { +case class StringTrim( +srcStr: Expression, +trimStr: Option[Expression] = None) + extends String2TrimExpression { - def convert(v: UTF8String): UTF8String = v.trim() + def this (trimStr: Expression, srcStr: Expression) = this(srcStr, Option(trimStr)) + + def this(srcStr: Expression) = this(srcStr, None) override def prettyName: String = "trim" + override def children: Seq[Expression] = if (trimStr.isDefined) { +srcStr :: trimStr.get :: Nil + } else { +srcStr :: Nil + } + override def eval(input: InternalRow): Any = { +val srcString = srcStr.eval(input).asInstanceOf[UTF8String] +if (srcString == null) { + null +} else { + if (trimStr.isDefined) { +return srcString.trim(trimStr.get.eval(input).asInstanceOf[UTF8String]) + } else { +return srcString.trim() + } +} + } + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -defineCodeGen(ctx, ev, c => s"($c).trim()") +val evals = children.map(_.genCode(ctx)) +val srcString = evals(0) + +if (evals.length == 1) { + ev.copy(evals.map(_.code).mkString("\n") + s""" +boolean ${ev.isNull} = false; +UTF8String ${ev.value} = null; +if (${srcString.isNull}) { + ${ev.isNull} = true; +} else { + ${ev.value} = ${srcString.value}.trim(); +}""") +} else { + val trimString = evals(1) + val getTrimFunction = +s""" +if (${trimString.isNull}) { + ${ev.isNull} = true; +} else { + ${ev.value} = ${srcString.value}.trim(${trimString.value}); +}""" + ev.copy(evals.map(_.code).mkString("\n") + +s""" +boolean ${ev.isNull} = false; +UT
[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...
Github user kevinyu98 commented on a diff in the pull request: https://github.com/apache/spark/pull/12646#discussion_r139278830 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -1179,6 +1179,26 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * Create a (windowed) Function expression. */ override def visitFunctionCall(ctx: FunctionCallContext): Expression = withOrigin(ctx) { +def replaceFunctions( + funcID: FunctionIdentifier, + ctx: FunctionCallContext): FunctionIdentifier = { +val opt = ctx.trimOption +if (opt != null) { + if (ctx.qualifiedName.getText.toLowerCase != "trim") { --- End diff -- I changed to toLowerCase(Locale.ROOT) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19230: [SPARK-22003][SQL] support array column in vectorized re...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19230 @liufengdb The PR description looks like an end-to-end failure. I'm curious are you facing the failure in an end-to-end case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19230: [SPARK-22003][SQL] support array column in vector...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19230#discussion_r139278205 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala --- @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.vectorized + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { + + var testVector: WritableColumnVector = _ + + private def allocate(capacity: Int, dt: DataType): WritableColumnVector = { +new OnHeapColumnVector(capacity, dt) + } + + override def afterEach(): Unit = { +testVector.close() + } + + test("boolean") { +testVector = allocate(10, BooleanType) +(0 until 10).foreach { i => + testVector.appendBoolean(i % 2 == 0) +} + +val array = new ColumnVector.Array(testVector) + +(0 until 10).foreach { i => + assert(array.get(i, BooleanType) === (i % 2 == 0)) +} + } + + test("byte") { +testVector = allocate(10, ByteType) +(0 until 10).foreach { i => + testVector.appendByte(i.toByte) +} + +val array = new ColumnVector.Array(testVector) + +(0 until 10).foreach { i => + assert(array.get(i, ByteType) === (i.toByte)) +} + } + + test("short") { +testVector = allocate(10, ShortType) +(0 until 10).foreach { i => + testVector.appendShort(i.toShort) +} + +val array = new ColumnVector.Array(testVector) + +(0 until 10).foreach { i => + assert(array.get(i, ShortType) === (i.toShort)) +} + } + + test("int") { +testVector = allocate(10, IntegerType) +(0 until 10).foreach { i => + testVector.appendInt(i) +} + +val array = new ColumnVector.Array(testVector) + +(0 until 10).foreach { i => + assert(array.get(i, IntegerType) === i) +} + } + + test("long") { +testVector = allocate(10, LongType) +(0 until 10).foreach { i => + testVector.appendLong(i) +} + +val array = new ColumnVector.Array(testVector) + +(0 until 10).foreach { i => + assert(array.get(i, LongType) === i) +} + } + + test("float") { +testVector = allocate(10, FloatType) +(0 until 10).foreach { i => + testVector.appendFloat(i.toFloat) +} + +val array = new ColumnVector.Array(testVector) + +(0 until 10).foreach { i => + assert(array.get(i, FloatType) === i.toFloat) +} + } + + test("double") { +testVector = allocate(10, DoubleType) +(0 until 10).foreach { i => + testVector.appendDouble(i.toDouble) +} + +val array = new ColumnVector.Array(testVector) + +(0 until 10).foreach { i => + assert(array.get(i, DoubleType) === i.toDouble) +} + } + + test("string") { +testVector = allocate(10, StringType) +(0 until 10).map { i => + val utf8 = s"str$i".getBytes("utf8") + testVector.appendByteArray(utf8, 0, utf8.length) +} + +val array = new ColumnVector.Array(testVector) + +(0 until 10).foreach { i => + assert(array.get(i, StringType) === UTF8String.fromString(s"str$i")) +} + } + + test("binary") { +testVector = allocate(10, BinaryType) +(0 until 10).map { i => + val utf8 = s"str$i".getBytes("utf8") + testVector.appendByteArray(utf8, 0, utf8.length) +} + +val array = new ColumnVector.Array(testVector) + +(0 u
[GitHub] spark issue #19239: [SPARK-22017] Take minimum of all watermark execs in Str...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/19239 thank you @joseph-torres merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19239 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19239: [SPARK-22017] Take minimum of all watermark execs in Str...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19239 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81837/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19239: [SPARK-22017] Take minimum of all watermark execs in Str...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19239 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19239: [SPARK-22017] Take minimum of all watermark execs in Str...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19239 **[Test build #81837 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81837/testReport)** for PR 19239 at commit [`cdf4361`](https://github.com/apache/spark/commit/cdf4361f6065e4e1d891992ebc30289957a6262f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on the issue: https://github.com/apache/spark/pull/16578 > * There is a minor change needed though, "parquetFormat: ParquetFileFormat" should be replaced by "fileFormat: FileFormat" as there is no dependency on the actual ParquetFileFormat class defined in parquet package https://github.com/apache/spark/pull/16578/files?diff=unified#diff-3bad814b3336a83f360d7395bd740759R38 This is true. I hesitate to weaken the match to all instances of `FileFormat` because the only format I have extensive experience and knowledge of is Parquet. There are other formats that could expand upon this work, such as ORC and ROOT, but I have no practical experience working with those. I'd prefer someone who does have such experience build on this PR to make it work with those file formats. Incidentally, I added a `ColumnarFileFormat` trait in this PR. You might consider it a marker for columnar file formats, but all it really does right now is compute the number of physical columns read for a given catalyst schema. This value is used in the description of a physical plan to help a dev/user ensure that their expected column pruning is in fact occurring. > * And may be renaming this ParquetSchemaPruning and taking it outside of the parquet package as it is quite more general than just for parquet, otherwise I have to add a special Rule here, https://github.com/apache/spark/pull/16578/files?diff=unified#diff-2370d8ed85930c93ef8e5ce67abca53fR35 ??? Moving and renaming `ParquetSchemaPruning` makes sense if it's generalized to other file formats. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19219: [SPARK-21993][SQL][WIP] Close sessionState when finish
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19219 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19219: [SPARK-21993][SQL][WIP] Close sessionState when finish
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19219 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81838/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19219: [SPARK-21993][SQL][WIP] Close sessionState when finish
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19219 **[Test build #81838 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81838/testReport)** for PR 19219 at commit [`1187dac`](https://github.com/apache/spark/commit/1187dac360a9cb23466781786ea5435e53f8d1d6). * This patch **fails to build**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19219: [SPARK-21993][SQL][WIP] Close sessionState when finish
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19219 **[Test build #81838 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81838/testReport)** for PR 19219 at commit [`1187dac`](https://github.com/apache/spark/commit/1187dac360a9cb23466781786ea5435e53f8d1d6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17862: [SPARK-20602] [ML]Adding LBFGS optimizer and Squared_hin...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17862 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19230: [SPARK-22003][SQL] support array column in vectorized re...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19230 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81835/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17862: [SPARK-20602] [ML]Adding LBFGS optimizer and Squared_hin...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17862 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81836/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19230: [SPARK-22003][SQL] support array column in vectorized re...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19230 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17862: [SPARK-20602] [ML]Adding LBFGS optimizer and Squared_hin...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17862 **[Test build #81836 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81836/testReport)** for PR 17862 at commit [`0f5cad5`](https://github.com/apache/spark/commit/0f5cad5ca9770871fb2a07968f53332f03e74903). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19230: [SPARK-22003][SQL] support array column in vectorized re...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19230 **[Test build #81835 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81835/testReport)** for PR 19230 at commit [`19502f9`](https://github.com/apache/spark/commit/19502f99e2d7cb5508f1bb155a78ee2b32c0bd38). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19239: [SPARK-22017] Take minimum of all watermark execs in Str...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19239 **[Test build #81837 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81837/testReport)** for PR 19239 at commit [`cdf4361`](https://github.com/apache/spark/commit/cdf4361f6065e4e1d891992ebc30289957a6262f). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17862: [SPARK-20602] [ML]Adding LBFGS optimizer and Squared_hin...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17862 **[Test build #81836 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81836/testReport)** for PR 17862 at commit [`0f5cad5`](https://github.com/apache/spark/commit/0f5cad5ca9770871fb2a07968f53332f03e74903). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18659 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18659 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81834/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18659 **[Test build #81834 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81834/testReport)** for PR 18659 at commit [`25e3a71`](https://github.com/apache/spark/commit/25e3a715e990709043daaaf23e5de088418a83ee). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19230: [SPARK-22003][SQL] support array column in vector...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19230#discussion_r139272552 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java --- @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.vectorized; +import org.apache.spark.api.java.function.Function; --- End diff -- We don't use this now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19180: [SPARK-21967][CORE] org.apache.spark.unsafe.types.UTF8St...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19180 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81831/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19180: [SPARK-21967][CORE] org.apache.spark.unsafe.types.UTF8St...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19180 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19180: [SPARK-21967][CORE] org.apache.spark.unsafe.types.UTF8St...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19180 **[Test build #81831 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81831/testReport)** for PR 19180 at commit [`0ec7111`](https://github.com/apache/spark/commit/0ec7111932df67056398fd3542c72afb0ae95002). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19239: [SPARK-22017] Take minimum of all watermark execs in Str...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19239 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19239: [SPARK-22017] Take minimum of all watermark execs in Str...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19239 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81832/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19239: [SPARK-22017] Take minimum of all watermark execs in Str...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19239 **[Test build #81832 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81832/testReport)** for PR 19239 at commit [`13affc7`](https://github.com/apache/spark/commit/13affc74b6304f7e2357d392716b68a83e273f29). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19230: [SPARK-22003][SQL] support array column in vectorized re...
Github user liufengdb commented on the issue: https://github.com/apache/spark/pull/19230 @viirya @cloud-fan unit test updated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19230: [SPARK-22003][SQL] support array column in vectorized re...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19230 **[Test build #81835 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81835/testReport)** for PR 19230 at commit [`19502f9`](https://github.com/apache/spark/commit/19502f99e2d7cb5508f1bb155a78ee2b32c0bd38). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19250 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19250 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81833/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19250 **[Test build #81833 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81833/testReport)** for PR 19250 at commit [`c5571a8`](https://github.com/apache/spark/commit/c5571a8de509954ed2e90e4954a2e77ac7ad9627). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139261343 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,33 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize +""" +import pyarrow as pa --- End diff -- Yeah, it would probably be best to handle it the same way as in `toPandas()`. That got me thinking that it is a little weird to have an SQLConf "spark.sql.execution.arrow.enable" that is set for `toPandas()` but has no bearing with `pandas_udf`. It doesn't need to since it is an explicit call but seems a little contradictory, what do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/18659 @ueshin , I merged your tests and added support for `**kwargs` to use "size" for 0-parameter UDFs. Do you think this might be a little better to be called "length" or "output_length"? I still need to fix the tests with null values, it shouldn't be a problem with the fix you provided. I'll add that soon. Otherwise I believe other tests are working, but we probably need to check chained UDFs also. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139259901 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("watermark with 2 streams") { +val first = MemoryStream[Int] + +val firstDf = first.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val second = MemoryStream[Int] + +val secondDf = second.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "5 seconds") + .select('value) + +val union = firstDf.union(secondDf) + .writeStream + .format("memory") + .queryName("test") + .start() + +def getWatermarkAfterData( +firstData: Seq[Int] = Seq.empty, +secondData: Seq[Int] = Seq.empty): Long = { + if (firstData.nonEmpty) first.addData(firstData) + if (secondData.nonEmpty) second.addData(secondData) + union.processAllAvailable() + // add a dummy batch so lastExecution has the new watermark + first.addData(0) + union.processAllAvailable() + // get last watermark + val lastExecution = union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution + lastExecution.offsetSeqMetadata.batchWatermarkMs +} + +// Global watermark starts at 0 until we get data from both sides +assert(getWatermarkAfterData(firstData = Seq(11)) == 0) +assert(getWatermarkAfterData(secondData = Seq(6)) == 1000) +// Global watermark stays at left watermark 1 when right watermark moves to 2 +assert(getWatermarkAfterData(secondData = Seq(8)) == 1000) +// Global watermark switches to right side value 2 when left watermark goes higher +assert(getWatermarkAfterData(firstData = Seq(21)) == 3000) +// Global watermark goes back to left +assert(getWatermarkAfterData(secondData = Seq(17, 28, 39)) == 11000) +// Global watermark stays on left as long as it's below right +assert(getWatermarkAfterData(firstData = Seq(31)) == 21000) +assert(getWatermarkAfterData(firstData = Seq(41)) == 31000) +// Global watermark switches back to right again +assert(getWatermarkAfterData(firstData = Seq(51)) == 34000) + +// Global watermark is updated correctly with simultaneous data from both sides +assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 9) +assert(getWatermarkAfterData(firstData = Seq(120), secondData = Seq(110)) == 105000) +assert(getWatermarkAfterData(firstData = Seq(130), secondData = Seq(125)) == 12) + +// Global watermark doesn't decrement with simultaneous data +assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 12) +assert(getWatermarkAfterData(firstData = Seq(140), secondData = Seq(100)) == 12) +assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(135)) == 13) --- End diff -- test recovery of the minimum after a restart. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18659 **[Test build #81834 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81834/testReport)** for PR 18659 at commit [`25e3a71`](https://github.com/apache/spark/commit/25e3a715e990709043daaaf23e5de088418a83ee). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19250 **[Test build #81833 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81833/testReport)** for PR 19250 at commit [`c5571a8`](https://github.com/apache/spark/commit/c5571a8de509954ed2e90e4954a2e77ac7ad9627). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19239: [SPARK-22017] Take minimum of all watermark execs in Str...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19239 **[Test build #81832 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81832/testReport)** for PR 19239 at commit [`13affc7`](https://github.com/apache/spark/commit/13affc74b6304f7e2357d392716b68a83e273f29). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19074: [SPARK-21714][CORE][BACKPORT-2.2] Avoiding re-uploading ...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19074 @loneknightpy can you file a new bug instead of comment on a closed PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19074: [SPARK-21714][CORE][BACKPORT-2.2] Avoiding re-upl...
Github user loneknightpy commented on a diff in the pull request: https://github.com/apache/spark/pull/19074#discussion_r139248521 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -311,12 +317,16 @@ object SparkSubmit extends CommandLineUtils { } // In client mode, download remote files. +var localPrimaryResource: String = null +var localJars: String = null +var localPyFiles: String = null +var localFiles: String = null if (deployMode == CLIENT) { --- End diff -- If you want to avoid download for yarn, can we just check the cluster mode it here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19074: [SPARK-21714][CORE][BACKPORT-2.2] Avoiding re-upl...
Github user loneknightpy commented on a diff in the pull request: https://github.com/apache/spark/pull/19074#discussion_r139247692 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -366,7 +376,7 @@ object SparkSubmit extends CommandLineUtils { // If a python file is provided, add it to the child arguments and list of files to deploy. // Usage: PythonAppRunner [app arguments] args.mainClass = "org.apache.spark.deploy.PythonRunner" -args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs +args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs if (clusterManager != YARN) { // The YARN backend distributes the primary file differently, so don't merge it. args.files = mergeFileLists(args.files, args.primaryResource) --- End diff -- It is a behavior change here. I think we should use `localFiles` and `localPrimaryResource` instead of `args.files` and `args.primaryResource`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19074: [SPARK-21714][CORE][BACKPORT-2.2] Avoiding re-upl...
Github user loneknightpy commented on a diff in the pull request: https://github.com/apache/spark/pull/19074#discussion_r139248109 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -376,8 +386,8 @@ object SparkSubmit extends CommandLineUtils { // The YARN backend handles python files differently, so don't merge the lists. args.files = mergeFileLists(args.files, args.pyFiles) --- End diff -- ditto. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19180: [SPARK-21967][CORE] org.apache.spark.unsafe.types.UTF8St...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19180 **[Test build #81831 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81831/testReport)** for PR 19180 at commit [`0ec7111`](https://github.com/apache/spark/commit/0ec7111932df67056398fd3542c72afb0ae95002). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19180: [SPARK-21967][CORE] org.apache.spark.unsafe.types.UTF8St...
Github user original-brownbear commented on the issue: https://github.com/apache/spark/pull/19180 @srowen alright then, switched to the JDK comparison left the rest as is :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19211: [SPARK-18838][core] Add separate listener queues to Live...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19211 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19211: [SPARK-18838][core] Add separate listener queues to Live...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19211 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81826/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19211: [SPARK-18838][core] Add separate listener queues to Live...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19211 **[Test build #81826 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81826/testReport)** for PR 19211 at commit [`ed71477`](https://github.com/apache/spark/commit/ed714778385f8dcb117f3c01a204bd9b024ea83c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19250 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81830/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19250 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19250 **[Test build #81830 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81830/testReport)** for PR 19250 at commit [`5105b72`](https://github.com/apache/spark/commit/5105b728dafa821d6063af97cfad0f49f029726a). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18978: [SPARK-21737][YARN]Create communication channel between ...
Github user tgravescs commented on the issue: https://github.com/apache/spark/pull/18978 we can close for now, got busy with some other stuff hope to get back to this soon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18978: [SPARK-21737][YARN]Create communication channel between ...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/18978 Any updates here? Or should we close this PR until this is properly investigated? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18897: [SPARK-21655][YARN] Support Kill CLI for Yarn mode
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/18897 Can we close this PR until #18978 is figured out? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19250 **[Test build #81830 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81830/testReport)** for PR 19250 at commit [`5105b72`](https://github.com/apache/spark/commit/5105b728dafa821d6063af97cfad0f49f029726a). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19250: [SPARK-12297] Table timezone correction for Times...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19250#discussion_r139234142 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -92,7 +92,7 @@ case class CreateHiveTableAsSelectCommand( } override def argString: String = { -s"[Database:${tableDesc.database}}, " + +s"[Database:${tableDesc.database}, " + --- End diff -- totally unrelated typo fix, but didn't seem worth an independent pr --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19250: [SPARK-12297] Table timezone correction for Times...
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/19250 [SPARK-12297] Table timezone correction for Timestamps ## What changes were proposed in this pull request? When reading and writing data, spark will adjust timestamp data based on the delta between the current session timezone and the table time zone (specified either by a persistent table property, or an option to the DataFrameReader / Writer). This is particularly important for parquet data, so that it can be treated equivalently by other SQL engines (eg. Impala and Hive). Furthermore, this is useful if the same data is processed by multiple clusters in different time zones, and "timestamp without time zone" semantics are desired. ## How was this patch tested? Unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark timestamp_all_formats Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19250.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19250 commit 54f87c2c1e0ab0645fa5497553cf031f13e98c3b Author: Imran Rashid Date: 2017-08-28T19:52:15Z SPARK-12297. Table timezones. commit 53b9fbe0c6128ec11afdb46d3239c693129f6952 Author: Imran Rashid Date: 2017-09-14T20:18:46Z All dataformats support timezone correction. Move rules & tests to a more appropriate location. Ensure rule works without hive support. Extra checks on when table timezones are set. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19246: [SPARK-22025] Speeding up fromInternal for StructField
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/19246 Hi, @maver1ck . Could you add your benchmark result on PR description for reviewers? That will be your commit log finally. > greatly speed up function calling --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19249: [SPARK-22032] Speed up StructType.fromInternal
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19249 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81829/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19249: [SPARK-22032] Speed up StructType.fromInternal
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19249 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19249: [SPARK-22032] Speed up StructType.fromInternal
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19249 **[Test build #81829 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81829/testReport)** for PR 19249 at commit [`64afb16`](https://github.com/apache/spark/commit/64afb16ead8126ff59a35288e0c43dc31e6db23c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user brad-kaiser commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r139223468 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala --- @@ -116,4 +121,11 @@ private[spark] object BlockManagerMessages { case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster case class HasCachedBlocks(executorId: String) extends ToBlockManagerMaster + + case class GetCachedBlocks(executorId: String) extends ToBlockManagerMaster + + case class GetSizeOfBlocks(blocks: Seq[(String, BlockId)]) extends ToBlockManagerMaster + + case class ReplicateOneBlock(executorId: String, blockId: BlockId, exclude: Seq[String]) --- End diff -- fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user brad-kaiser commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r139223492 --- Diff: docs/configuration.md --- @@ -1705,6 +1705,26 @@ Apart from these, the following properties are also available, and may be useful description. + + spark.dynamicAllocation.recoverCachedData + false + + If dynamic allocation is enabled, and --- End diff -- fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user brad-kaiser commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r139223409 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala --- @@ -32,8 +32,13 @@ private[spark] object BlockManagerMessages { // blocks that the master knows about. case class RemoveBlock(blockId: BlockId) extends ToBlockManagerSlave + // Replicate block excluding certain executors for graceful shutdown. --- End diff -- Removed both comments because the old one isn't accurate anymore. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user brad-kaiser commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r139223321 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -1231,6 +1231,7 @@ private[spark] class BlockManager( def replicateBlock( blockId: BlockId, existingReplicas: Set[BlockManagerId], + excluding: Set[BlockManagerId], --- End diff -- Left it as excluding. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user brad-kaiser commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r139223199 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -612,25 +623,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp force: Boolean): Seq[String] = { logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") -val response = synchronized { +val response: Future[Seq[String]] = synchronized { val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) - unknownExecutors.foreach { id => -logWarning(s"Executor to kill $id does not exist!") - } + unknownExecutors.foreach(id => logWarning(s"Executor to kill $id does not exist!")) // If an executor is already pending to be removed, do not kill it again (SPARK-9795) // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552) val executorsToKill = knownExecutors -.filter { id => !executorsPendingToRemove.contains(id) } -.filter { id => force || !scheduler.isExecutorBusy(id) } - executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } +.filter(id => !executorsPendingToRemove.contains(id)) --- End diff -- reverted --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19249: [SPARK-22032] Speed up StructType.fromInternal
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19249 **[Test build #81829 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81829/testReport)** for PR 19249 at commit [`64afb16`](https://github.com/apache/spark/commit/64afb16ead8126ff59a35288e0c43dc31e6db23c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19249: [SPARK-22032] Speed up StructType.fromInternal
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19249 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81828/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19249: [SPARK-22032] Speed up StructType.fromInternal
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19249 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19249: [SPARK-22032] Speed up StructType.fromInternal
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19249 **[Test build #81828 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81828/testReport)** for PR 19249 at commit [`e4d7f76`](https://github.com/apache/spark/commit/e4d7f76f22007e7e77982da507b97b314ebd4b41). * This patch **fails Python style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19249: [SPARK-22032] Speed up StructType.fromInternal
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19249 **[Test build #81828 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81828/testReport)** for PR 19249 at commit [`e4d7f76`](https://github.com/apache/spark/commit/e4d7f76f22007e7e77982da507b97b314ebd4b41). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19249: [SPARK-22032] Speed up StructType.fromInternal
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19249 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81827/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19249: [SPARK-22032] Speed up StructType.fromInternal
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19249 **[Test build #81827 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81827/testReport)** for PR 19249 at commit [`aa69a72`](https://github.com/apache/spark/commit/aa69a72d71c55e93b487ac28910b9187c0c71088). * This patch **fails Python style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19249: [SPARK-22032] Speed up StructType.fromInternal
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19249 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19249: [SPARK-22032] Speed up StructType.fromInternal
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19249 **[Test build #81827 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81827/testReport)** for PR 19249 at commit [`aa69a72`](https://github.com/apache/spark/commit/aa69a72d71c55e93b487ac28910b9187c0c71088). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19249: [SPARK-22032] Speed up StructType.fromInternal
GitHub user maver1ck opened a pull request: https://github.com/apache/spark/pull/19249 [SPARK-22032] Speed up StructType.fromInternal ## What changes were proposed in this pull request? StructType.fromInternal is calling f.fromInternal(v) for every field. We can use needConversion method to limit the number of function calls ## How was this patch tested? Existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/maver1ck/spark spark_22032 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19249.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19249 commit aa69a72d71c55e93b487ac28910b9187c0c71088 Author: Maciej BryÅski Date: 2017-09-15T18:01:40Z Update types.py --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19211: [SPARK-18838][core] Add separate listener queues to Live...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19211 **[Test build #81826 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81826/testReport)** for PR 19211 at commit [`ed71477`](https://github.com/apache/spark/commit/ed714778385f8dcb117f3c01a204bd9b024ea83c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19238: [SPARK-22016][SQL] Add HiveDialect for JDBC conne...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19238#discussion_r139203753 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala --- @@ -1103,6 +1103,17 @@ class JDBCSuite extends SparkFunSuite """.stripMargin) val df3 = sql("SELECT * FROM test_sessionInitStatement") - assert(df3.collect() === Array(Row(21519, 1234))) -} + assert(df3.collect() === Array(Row(21519, 1234)) +) --- End diff -- Ur, actually, I meant the original Spark code is also wrong in terms of indentation. You can fix the indentation of original line 1105~1107 here. :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19238: [SPARK-22016][SQL] Add HiveDialect for JDBC conne...
Github user danielfx90 commented on a diff in the pull request: https://github.com/apache/spark/pull/19238#discussion_r139202470 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala --- @@ -1103,6 +1103,17 @@ class JDBCSuite extends SparkFunSuite """.stripMargin) val df3 = sql("SELECT * FROM test_sessionInitStatement") - assert(df3.collect() === Array(Row(21519, 1234))) -} + assert(df3.collect() === Array(Row(21519, 1234)) +) --- End diff -- @dongjoon-hyun done! Thank you! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19238: [SPARK-22016][SQL] Add HiveDialect for JDBC conne...
Github user danielfx90 commented on a diff in the pull request: https://github.com/apache/spark/pull/19238#discussion_r139201902 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala --- @@ -1103,6 +1103,17 @@ class JDBCSuite extends SparkFunSuite """.stripMargin) val df3 = sql("SELECT * FROM test_sessionInitStatement") - assert(df3.collect() === Array(Row(21519, 1234))) -} + assert(df3.collect() === Array(Row(21519, 1234)) +) --- End diff -- It must have changed when formatting the code using the IDE. Scalastyle checks passed though, but let me rollback that anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15770: [SPARK-15784][ML]:Add Power Iteration Clustering to spar...
Github user wangmiao1981 commented on the issue: https://github.com/apache/spark/pull/15770 I will address the review comments soon. Thanks! @WeichenXu123 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19211#discussion_r139199768 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -65,53 +60,61 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { /** When `droppedEventsCounter` was logged last time in milliseconds. */ @volatile private var lastReportTimestamp = 0L - // Indicate if we are processing some event - // Guarded by `self` - private var processingEvent = false - - private val logDroppedEvent = new AtomicBoolean(false) - - // A counter that represents the number of events produced and consumed in the queue - private val eventLock = new Semaphore(0) - - private val listenerThread = new Thread(name) { -setDaemon(true) -override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { - LiveListenerBus.withinListenerThread.withValue(true) { -val timer = metrics.eventProcessingTime -while (true) { - eventLock.acquire() - self.synchronized { -processingEvent = true - } - try { -val event = eventQueue.poll -if (event == null) { - // Get out of the while loop and shutdown the daemon thread - if (!stopped.get) { -throw new IllegalStateException("Polling `null` from eventQueue means" + - " the listener bus has been stopped. So `stopped` must be true") - } - return -} -val timerContext = timer.time() -try { - postToAll(event) -} finally { - timerContext.stop() -} - } finally { -self.synchronized { - processingEvent = false -} - } + private val queues = new CopyOnWriteArrayList[AsyncEventQueue]() --- End diff -- I can add specific methods for each internal queue; but I'd like to keep the internal management of queues more generic. One of the ideas in #18253 was to allow filtering of which events are enqueued at all (e.g. don't enqueue `SparkListenerBlockUpdated` because it's not written to the event logs, reducing the load on the event log queue). Leaving the internal management more generic would allow that to be more easily / cleanly implemented later (instead of "addToEventLogQueue", you'd have a "addCustomQueue" method with a subclass of `AsyncEventQueue`). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19248: [SPARK-22027] Add missing explanation of default ...
Github user exKAZUu commented on a diff in the pull request: https://github.com/apache/spark/pull/19248#discussion_r139198364 --- Diff: mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala --- @@ -44,7 +44,7 @@ private[ml] trait HasRegParam extends Params { private[ml] trait HasMaxIter extends Params { /** - * Param for maximum number of iterations (>= 0). + * Param for maximum number of iterations (>= 0). (default = 20) --- End diff -- I think the default value (20) comes from https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tree/treeParams.scala#L498 Is it wrong? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19196#discussion_r139197660 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala --- @@ -381,4 +388,187 @@ class StreamingAggregationSuite extends StateStoreMetricsTest AddData(streamInput, 0, 1, 2, 3), CheckLastBatch((0, 0, 2), (1, 1, 3))) } + + /** + * This method verifies certain properties in the SparkPlan of a streaming aggregation. + * First of all, it checks that the child of a `StateStoreRestoreExec` creates the desired + * data distribution, where the child could be an Exchange, or a `HashAggregateExec` which already + * provides the expected data distribution. + * + * The second thing it checks that the child provides the expected number of partitions. + * + * The third thing it checks that we don't add an unnecessary shuffle in-between + * `StateStoreRestoreExec` and `StateStoreSaveExec`. + */ + private def checkAggregationChain( + se: StreamExecution, + expectShuffling: Boolean, + expectedPartition: Int): Boolean = { +val executedPlan = se.lastExecution.executedPlan +val restore = executedPlan + .collect { case ss: StateStoreRestoreExec => ss } + .head +restore.child match { + case node: UnaryExecNode => +assert(node.outputPartitioning.numPartitions === expectedPartition, + "Didn't get the expected number of partitions.") +if (expectShuffling) { + assert(node.isInstanceOf[Exchange], s"Expected a shuffle, got: ${node.child}") +} else { + assert(!node.isInstanceOf[Exchange], "Didn't expect a shuffle") +} + + case _ => fail("Expected no shuffling") +} +var reachedRestore = false +// Check that there should be no exchanges after `StateStoreRestoreExec` +executedPlan.foreachUp { p => + if (reachedRestore) { +assert(!p.isInstanceOf[Exchange], "There should be no further exchanges") + } else { +reachedRestore = p.isInstanceOf[StateStoreRestoreExec] + } +} +true + } + + /** Add blocks of data to the `BlockRDDBackedSource`. */ + case class AddBlockData(source: BlockRDDBackedSource, data: Seq[Int]*) extends AddData { +override def addData(query: Option[StreamExecution]): (Source, Offset) = { + if (data.nonEmpty) { +data.foreach(source.addData) + } else { +// we would like to create empty blockRDD's so add an empty block here. +source.addData() + } + source.releaseLock() + (source, LongOffset(source.counter)) +} + } + + test("SPARK-21977: coalesce(1) with 0 partition RDD should be repartitioned to 1") { +val inputSource = new BlockRDDBackedSource(spark) +MockSourceProvider.withMockSources(inputSource) { + withTempDir { tempDir => +val aggregated: Dataset[Long] = + spark.readStream +.format((new MockSourceProvider).getClass.getCanonicalName) +.load() +.coalesce(1) +.groupBy() +.count() +.as[Long] + +testStream(aggregated, Complete())( + AddBlockData(inputSource, Seq(1)), + CheckLastBatch(1), + AssertOnQuery("Verify no shuffling") { se => +checkAggregationChain(se, expectShuffling = false, 1) + }, + AddBlockData(inputSource), // create an empty trigger + CheckLastBatch(1), + AssertOnQuery("Verify addition of exchange operator") { se => +checkAggregationChain(se, expectShuffling = true, 1) + }, + AddBlockData(inputSource, Seq(2, 3)), + CheckLastBatch(3), + AddBlockData(inputSource), + CheckLastBatch(3), + StopStream +) + } +} + } + + test("SPARK-21977: coalesce(1) should still be repartitioned when it has keyExpressions") { +val inputSource = new BlockRDDBackedSource(spark) +MockSourceProvider.withMockSources(inputSource) { + withTempDir { tempDir => + +def createDf(partitions: Int): Dataset[(Long, Long)] = { + spark.readStream +.format((new MockSourceProvider).getClass.getCanonicalName) +.load() +.coalesce(partitions) +.groupBy('a % 1) // just to give it a fake key +.count() +.as[(Long, Long)] +} + +testStream(createDf(1), Complete())( + StartStream(ch
[GitHub] spark issue #19227: [SPARK-20060][CORE] Support accessing secure Hadoop clus...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19227 > If people have a standalone cluster they have secured off for only a set of users and they all run under a single user, having access to secure HDFS is still very useful. No disputing that, but you can do that today by manually deploying a keytab to all worker nodes, and making sure its "kinit'ed" before the worker daemon goes up (and the login refreshed periodically). No changes in Spark needed for that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19243: [SPARK-21780][R] Simpler Dataset.sample API in R
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19243 Sure. This one is abit tricky. Let me try to find out a better way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19243: [SPARK-21780][R] Simpler Dataset.sample API in R
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/19243 let me think about this a bit... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139188790 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression, ExpressionDescription} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints will be + * sorted into ascending order. To count ndv's in these intervals, apply the HyperLogLogPlusPlus + * algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +@ExpressionDescription( --- End diff -- we don't need this if it's an internal func --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139190161 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression, ExpressionDescription} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints will be + * sorted into ascending order. To count ndv's in these intervals, apply the HyperLogLogPlusPlus + * algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +@ExpressionDescription( + usage = """ +_FUNC_(col, array(endpoint_1, endpoint_2, ... endpoint_N)) - Returns the approximate + number of distinct values (ndv) for intervals [endpoint_1, endpoint_2], + (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + +_FUNC_(col, array(endpoint_1, endpoint_2, ... endpoint_N), relativeSD=0.05) - Returns + the approximate number of distinct values (ndv) for intervals with relativeSD, the maximum + estimation error allowed in the HyperLogLogPlusPlus algorithm. + """, + extended = """ +Examples: + > SELECT approx_count_distinct_for_intervals(10.0, array(5, 15, 25), 0.01); + [1, 0] + """) +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = { +val doubleArray = (endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} +util.Arrays.sort(doubleArray) --- End diff -- again, if it's only used internally, we can require the caller side to pass the endpoints sorted. --- - To unsubscribe, e-mail: reviews-unsu
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139190528 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression, ExpressionDescription} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints will be + * sorted into ascending order. To count ndv's in these intervals, apply the HyperLogLogPlusPlus + * algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +@ExpressionDescription( + usage = """ +_FUNC_(col, array(endpoint_1, endpoint_2, ... endpoint_N)) - Returns the approximate + number of distinct values (ndv) for intervals [endpoint_1, endpoint_2], + (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + +_FUNC_(col, array(endpoint_1, endpoint_2, ... endpoint_N), relativeSD=0.05) - Returns + the approximate number of distinct values (ndv) for intervals with relativeSD, the maximum + estimation error allowed in the HyperLogLogPlusPlus algorithm. + """, + extended = """ +Examples: + > SELECT approx_count_distinct_for_intervals(10.0, array(5, 15, 25), 0.01); + [1, 0] + """) +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = { +val doubleArray = (endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} +util.Arrays.sort(doubleArray) +doubleArray + } + + override def checkInputDataTypes(): TypeCheckResult = { +val defaultCheck = super.checkInputDataTypes() +if (defaultCheck.isFailure) { + defaultCheck +} else if (!en
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139188326 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala --- @@ -270,6 +270,7 @@ object FunctionRegistry { expression[Remainder]("%"), // aggregate functions + expression[ApproxCountDistinctForIntervals]("approx_count_distinct_for_intervals"), --- End diff -- If it's only used internally, we don't need to register it here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19238: [SPARK-22016][SQL] Add HiveDialect for JDBC conne...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19238#discussion_r139188633 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala --- @@ -1103,6 +1103,17 @@ class JDBCSuite extends SparkFunSuite """.stripMargin) val df3 = sql("SELECT * FROM test_sessionInitStatement") - assert(df3.collect() === Array(Row(21519, 1234))) -} + assert(df3.collect() === Array(Row(21519, 1234)) +) --- End diff -- This ')' is wrong. Line 1105~1107 from the original have indentation issue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19237: [SPARK-21987][SQL] fix a compatibility issue of sql even...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19237 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org