svn commit: r30107 - in /dev/spark/2.4.1-SNAPSHOT-2018_10_16_22_02-3591bd2-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Oct 17 05:20:25 2018 New Revision: 30107 Log: Apache Spark 2.4.1-SNAPSHOT-2018_10_16_22_02-3591bd2 docs [This commit notification would consist of 1472 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SQL][CATALYST][MINOR] update some error comments
Repository: spark Updated Branches: refs/heads/branch-2.4 144cb949d -> 3591bd229 [SQL][CATALYST][MINOR] update some error comments ## What changes were proposed in this pull request? this PR correct some comment error: 1. change from "as low a possible" to "as low as possible" in RewriteDistinctAggregates.scala 2. delete redundant word âwithâ in HiveTableScanExecâs doExecute() method ## How was this patch tested? Existing unit tests. Closes #22694 from CarolinePeng/update_comment. Authored-by: å½ç¿00244106 <00244106@zte.intra> Signed-off-by: hyukjinkwon (cherry picked from commit e9332f600eb4f275b3bff368863a68c2a4349182) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3591bd22 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3591bd22 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3591bd22 Branch: refs/heads/branch-2.4 Commit: 3591bd2293f49ac8023166597704ad1bd21dabe9 Parents: 144cb94 Author: å½ç¿00244106 <00244106@zte.intra> Authored: Wed Oct 17 12:45:13 2018 +0800 Committer: hyukjinkwon Committed: Wed Oct 17 12:45:30 2018 +0800 -- .../spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala | 4 ++-- .../org/apache/spark/sql/hive/execution/HiveTableScanExec.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3591bd22/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index 4448ace..b946800 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -95,7 +95,7 @@ import org.apache.spark.sql.types.IntegerType * * This rule duplicates the input data by two or more times (# distinct groups + an optional * non-distinct group). This will put quite a bit of memory pressure of the used aggregate and - * exchange operators. Keeping the number of distinct groups as low a possible should be priority, + * exchange operators. Keeping the number of distinct groups as low as possible should be priority, * we could improve this in the current rule by applying more advanced expression canonicalization * techniques. */ @@ -241,7 +241,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { groupByAttrs ++ distinctAggChildAttrs ++ Seq(gid) ++ regularAggChildAttrMap.map(_._2), a.child) - // Construct the first aggregate operator. This de-duplicates the all the children of + // Construct the first aggregate operator. This de-duplicates all the children of // distinct operators, and applies the regular aggregate operators. val firstAggregateGroupBy = groupByAttrs ++ distinctAggChildAttrs :+ gid val firstAggregate = Aggregate( http://git-wip-us.apache.org/repos/asf/spark/blob/3591bd22/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index b3795b4..92c6632 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -182,7 +182,7 @@ case class HiveTableScanExec( protected override def doExecute(): RDD[InternalRow] = { // Using dummyCallSite, as getCallSite can turn out to be expensive with -// with multiple partitions. +// multiple partitions. val rdd = if (!relation.isPartitioned) { Utils.withDummyCallSite(sqlContext.sparkContext) { hadoopReader.makeRDDForTable(hiveQlTable) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SQL][CATALYST][MINOR] update some error comments
Repository: spark Updated Branches: refs/heads/master a9f685bb7 -> e9332f600 [SQL][CATALYST][MINOR] update some error comments ## What changes were proposed in this pull request? this PR correct some comment error: 1. change from "as low a possible" to "as low as possible" in RewriteDistinctAggregates.scala 2. delete redundant word âwithâ in HiveTableScanExecâs doExecute() method ## How was this patch tested? Existing unit tests. Closes #22694 from CarolinePeng/update_comment. Authored-by: å½ç¿00244106 <00244106@zte.intra> Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e9332f60 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e9332f60 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e9332f60 Branch: refs/heads/master Commit: e9332f600eb4f275b3bff368863a68c2a4349182 Parents: a9f685b Author: å½ç¿00244106 <00244106@zte.intra> Authored: Wed Oct 17 12:45:13 2018 +0800 Committer: hyukjinkwon Committed: Wed Oct 17 12:45:13 2018 +0800 -- .../spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala | 4 ++-- .../org/apache/spark/sql/hive/execution/HiveTableScanExec.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e9332f60/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index 4448ace..b946800 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -95,7 +95,7 @@ import org.apache.spark.sql.types.IntegerType * * This rule duplicates the input data by two or more times (# distinct groups + an optional * non-distinct group). This will put quite a bit of memory pressure of the used aggregate and - * exchange operators. Keeping the number of distinct groups as low a possible should be priority, + * exchange operators. Keeping the number of distinct groups as low as possible should be priority, * we could improve this in the current rule by applying more advanced expression canonicalization * techniques. */ @@ -241,7 +241,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { groupByAttrs ++ distinctAggChildAttrs ++ Seq(gid) ++ regularAggChildAttrMap.map(_._2), a.child) - // Construct the first aggregate operator. This de-duplicates the all the children of + // Construct the first aggregate operator. This de-duplicates all the children of // distinct operators, and applies the regular aggregate operators. val firstAggregateGroupBy = groupByAttrs ++ distinctAggChildAttrs :+ gid val firstAggregate = Aggregate( http://git-wip-us.apache.org/repos/asf/spark/blob/e9332f60/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index b3795b4..92c6632 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -182,7 +182,7 @@ case class HiveTableScanExec( protected override def doExecute(): RDD[InternalRow] = { // Using dummyCallSite, as getCallSite can turn out to be expensive with -// with multiple partitions. +// multiple partitions. val rdd = if (!relation.isPartitioned) { Utils.withDummyCallSite(sqlContext.sparkContext) { hadoopReader.makeRDDForTable(hiveQlTable) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r30106 - in /dev/spark/3.0.0-SNAPSHOT-2018_10_16_20_02-e9af946-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Oct 17 03:20:33 2018 New Revision: 30106 Log: Apache Spark 3.0.0-SNAPSHOT-2018_10_16_20_02-e9af946 docs [This commit notification would consist of 1478 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25734][SQL] Literal should have a value corresponding to dataType
Repository: spark Updated Branches: refs/heads/master e9af9460b -> a9f685bb7 [SPARK-25734][SQL] Literal should have a value corresponding to dataType ## What changes were proposed in this pull request? `Literal.value` should have a value a value corresponding to `dataType`. This pr added code to verify it and fixed the existing tests to do so. ## How was this patch tested? Modified the existing tests. Closes #22724 from maropu/SPARK-25734. Authored-by: Takeshi Yamamuro Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9f685bb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9f685bb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9f685bb Branch: refs/heads/master Commit: a9f685bb704dd683fec2da77c4e1f723b14cfe8d Parents: e9af946 Author: Takeshi Yamamuro Authored: Wed Oct 17 11:02:39 2018 +0800 Committer: Wenchen Fan Committed: Wed Oct 17 11:02:39 2018 +0800 -- .../apache/spark/sql/kafka010/KafkaWriter.scala | 2 +- .../sql/catalyst/expressions/literals.scala | 46 +++- .../catalyst/analysis/TypeCoercionSuite.scala | 30 +++-- .../expressions/JsonExpressionsSuite.scala | 16 --- .../expressions/NullExpressionsSuite.scala | 6 ++- .../expressions/SortOrderExpressionsSuite.scala | 10 ++--- .../catalyst/expressions/TimeWindowSuite.scala | 4 +- .../expressions/aggregate/PercentileSuite.scala | 11 +++-- .../command/AnalyzeColumnCommand.scala | 4 +- .../execution/joins/BroadcastJoinSuite.scala| 2 +- 10 files changed, 92 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a9f685bb/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala index fc09938..e1a9191 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala @@ -52,7 +52,7 @@ private[kafka010] object KafkaWriter extends Logging { s"'$TOPIC_ATTRIBUTE_NAME' attribute is present. Use the " + s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a topic.") } else { -Literal(topic.get, StringType) +Literal.create(topic.get, StringType) } ).dataType match { case StringType => // good http://git-wip-us.apache.org/repos/asf/spark/blob/a9f685bb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index 2bcbb92..34d2528 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -40,9 +40,10 @@ import org.json4s.JsonAST._ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.expressions.codegen._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types._ +import org.apache.spark.util.Utils object Literal { val TrueLiteral: Literal = Literal(true, BooleanType) @@ -196,6 +197,47 @@ object Literal { case other => throw new RuntimeException(s"no default for type $dataType") } + + private[expressions] def validateLiteralValue(value: Any, dataType: DataType): Unit = { +def doValidate(v: Any, dataType: DataType): Boolean = dataType match { + case _ if v == null => true + case BooleanType => v.isInstanceOf[Boolean] + case ByteType => v.isInstanceOf[Byte] + case ShortType => v.isInstanceOf[Short] + case IntegerType | DateType => v.isInstanceOf[Int] + case LongType | TimestampType => v.isInstanceOf[Long] + case FloatType => v.isInstanceOf[Float] + case DoubleType => v.isInstanceOf[Double] + case _: DecimalType => v.isInstanceOf[Decimal] + case CalendarIntervalType => v.isInstanceOf[CalendarInterval] + case BinaryType => v.isInstanceOf[Array[Byte]] + case StringType => v.isInstanceOf[UTF8String] + case st: StructType => +v.isInstanceO
[1/2] spark git commit: [SPARK-25393][SQL] Adding new function from_csv()
Repository: spark Updated Branches: refs/heads/master 9d4dd7992 -> e9af9460b http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala deleted file mode 100644 index 492a21b..000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.csv - -import java.nio.charset.StandardCharsets -import java.util.{Locale, TimeZone} - -import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling} -import org.apache.commons.lang3.time.FastDateFormat - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.util._ - -class CSVOptions( -@transient val parameters: CaseInsensitiveMap[String], -val columnPruning: Boolean, -defaultTimeZoneId: String, -defaultColumnNameOfCorruptRecord: String) - extends Logging with Serializable { - - def this( -parameters: Map[String, String], -columnPruning: Boolean, -defaultTimeZoneId: String, -defaultColumnNameOfCorruptRecord: String = "") = { - this( -CaseInsensitiveMap(parameters), -columnPruning, -defaultTimeZoneId, -defaultColumnNameOfCorruptRecord) - } - - private def getChar(paramName: String, default: Char): Char = { -val paramValue = parameters.get(paramName) -paramValue match { - case None => default - case Some(null) => default - case Some(value) if value.length == 0 => '\u' - case Some(value) if value.length == 1 => value.charAt(0) - case _ => throw new RuntimeException(s"$paramName cannot be more than one character") -} - } - - private def getInt(paramName: String, default: Int): Int = { -val paramValue = parameters.get(paramName) -paramValue match { - case None => default - case Some(null) => default - case Some(value) => try { -value.toInt - } catch { -case e: NumberFormatException => - throw new RuntimeException(s"$paramName should be an integer. Found $value") - } -} - } - - private def getBool(paramName: String, default: Boolean = false): Boolean = { -val param = parameters.getOrElse(paramName, default.toString) -if (param == null) { - default -} else if (param.toLowerCase(Locale.ROOT) == "true") { - true -} else if (param.toLowerCase(Locale.ROOT) == "false") { - false -} else { - throw new Exception(s"$paramName flag can be true or false") -} - } - - val delimiter = CSVUtils.toChar( -parameters.getOrElse("sep", parameters.getOrElse("delimiter", ","))) - val parseMode: ParseMode = -parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode) - val charset = parameters.getOrElse("encoding", -parameters.getOrElse("charset", StandardCharsets.UTF_8.name())) - - val quote = getChar("quote", '\"') - val escape = getChar("escape", '\\') - val charToEscapeQuoteEscaping = parameters.get("charToEscapeQuoteEscaping") match { -case None => None -case Some(null) => None -case Some(value) if value.length == 0 => None -case Some(value) if value.length == 1 => Some(value.charAt(0)) -case _ => - throw new RuntimeException("charToEscapeQuoteEscaping cannot be more than one character") - } - val comment = getChar("comment", '\u') - - val headerFlag = getBool("header") - val inferSchemaFlag = getBool("inferSchema") - val ignoreLeadingWhiteSpaceInRead = getBool("ignoreLeadingWhiteSpace", default = false) - val ignoreTrailingWhiteSpaceInRead = getBool("ignoreTrailingWhiteSpace", default = false) - - // For write, both options were `true` by default. We leave it as `true` for - // backwards compatibility. - val ignoreLeadingWhiteSpaceFlagInWrite = getBool("ignoreLea
[2/2] spark git commit: [SPARK-25393][SQL] Adding new function from_csv()
[SPARK-25393][SQL] Adding new function from_csv() ## What changes were proposed in this pull request? The PR adds new function `from_csv()` similar to `from_json()` to parse columns with CSV strings. I added the following methods: ```Scala def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column ``` and this signature to call it from Python, R and Java: ```Scala def from_csv(e: Column, schema: String, options: java.util.Map[String, String]): Column ``` ## How was this patch tested? Added new test suites `CsvExpressionsSuite`, `CsvFunctionsSuite` and sql tests. Closes #22379 from MaxGekk/from_csv. Lead-authored-by: Maxim Gekk Co-authored-by: Maxim Gekk Co-authored-by: Hyukjin Kwon Co-authored-by: hyukjinkwon Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e9af9460 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e9af9460 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e9af9460 Branch: refs/heads/master Commit: e9af9460bc008106b670abac44a869721bfde42a Parents: 9d4dd79 Author: Maxim Gekk Authored: Wed Oct 17 09:32:05 2018 +0800 Committer: hyukjinkwon Committed: Wed Oct 17 09:32:05 2018 +0800 -- R/pkg/NAMESPACE | 1 + R/pkg/R/functions.R | 40 ++- R/pkg/R/generics.R | 4 + R/pkg/tests/fulltests/test_sparkSQL.R | 7 + python/pyspark/sql/functions.py | 37 +- sql/catalyst/pom.xml| 6 + .../catalyst/analysis/FunctionRegistry.scala| 5 +- .../spark/sql/catalyst/csv/CSVExprUtils.scala | 82 + .../sql/catalyst/csv/CSVHeaderChecker.scala | 131 +++ .../spark/sql/catalyst/csv/CSVOptions.scala | 217 .../sql/catalyst/csv/UnivocityParser.scala | 351 ++ .../sql/catalyst/expressions/ExprUtils.scala| 45 +++ .../catalyst/expressions/csvExpressions.scala | 120 +++ .../catalyst/expressions/jsonExpressions.scala | 21 +- .../sql/catalyst/util/FailureSafeParser.scala | 80 + .../sql/catalyst/csv/CSVExprUtilsSuite.scala| 61 .../expressions/CsvExpressionsSuite.scala | 158 + .../org/apache/spark/sql/DataFrameReader.scala | 5 +- .../datasources/FailureSafeParser.scala | 82 - .../datasources/csv/CSVDataSource.scala | 1 + .../datasources/csv/CSVFileFormat.scala | 1 + .../datasources/csv/CSVHeaderChecker.scala | 131 --- .../datasources/csv/CSVInferSchema.scala| 1 + .../execution/datasources/csv/CSVOptions.scala | 217 .../execution/datasources/csv/CSVUtils.scala| 67 +--- .../datasources/csv/UnivocityGenerator.scala| 1 + .../datasources/csv/UnivocityParser.scala | 352 --- .../datasources/json/JsonDataSource.scala | 1 + .../scala/org/apache/spark/sql/functions.scala | 32 ++ .../sql-tests/inputs/csv-functions.sql | 9 + .../sql-tests/results/csv-functions.sql.out | 69 .../apache/spark/sql/CsvFunctionsSuite.scala| 62 .../datasources/csv/CSVInferSchemaSuite.scala | 1 + .../datasources/csv/CSVUtilsSuite.scala | 61 .../datasources/csv/UnivocityParserSuite.scala | 2 +- 35 files changed, 1531 insertions(+), 930 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 96ff389..c512284 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -274,6 +274,7 @@ exportMethods("%<=>%", "floor", "format_number", "format_string", + "from_csv", "from_json", "from_unixtime", "from_utc_timestamp", http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 6a8fef5..d2ca1d6 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -188,6 +188,7 @@ NULL #' \item \code{to_json}: it is the column containing the struct, array of the structs, #' the map or array of maps. #' \item \code{from_json}: it is the column containing the JSON string. +#' \item \code{from_csv}: it is the column containing the CSV string. #' } #' @param y Column to compute on. #' @param value A value to compute on. @@ -196,6 +197,13 @@ NULL #' \item \code{array_position}: a value to locate in the given array. #' \item \code{array_remove}: a value to remove in the given array. #'
svn commit: r30104 - in /dev/spark/3.0.0-SNAPSHOT-2018_10_16_16_03-9d4dd79-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Oct 16 23:20:50 2018 New Revision: 30104 Log: Apache Spark 3.0.0-SNAPSHOT-2018_10_16_16_03-9d4dd79 docs [This commit notification would consist of 1478 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25631][SPARK-25632][SQL][TEST] Improve the test runtime of KafkaRDDSuite
Repository: spark Updated Branches: refs/heads/master bd2c44713 -> 9d4dd7992 [SPARK-25631][SPARK-25632][SQL][TEST] Improve the test runtime of KafkaRDDSuite ## What changes were proposed in this pull request? Set a reasonable poll timeout thats used while consuming topics/partitions from kafka. In the absence of it, a default of 2 minute is used as the timeout values. And all the negative tests take a minimum of 2 minute to execute. After this change, we save about 4 minutes in this suite. ## How was this patch tested? Test fix. Closes #22670 from dilipbiswal/SPARK-25631. Authored-by: Dilip Biswal Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d4dd799 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d4dd799 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d4dd799 Branch: refs/heads/master Commit: 9d4dd7992bb8c3bd2ae3081734e6a5ed8531fddb Parents: bd2c447 Author: Dilip Biswal Authored: Tue Oct 16 17:49:40 2018 -0500 Committer: Sean Owen Committed: Tue Oct 16 17:49:40 2018 -0500 -- .../apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala | 3 +++ .../org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala | 4 2 files changed, 7 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9d4dd799/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala -- diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 1974bb1..93d0d2f 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -52,6 +52,9 @@ class DirectKafkaStreamSuite val sparkConf = new SparkConf() .setMaster("local[4]") .setAppName(this.getClass.getSimpleName) +// Set a timeout of 10 seconds that's going to be used to fetch topics/partitions from kafka. +// Othewise the poll timeout defaults to 2 minutes and causes test cases to run longer. +.set("spark.streaming.kafka.consumer.poll.ms", "1") private var ssc: StreamingContext = _ private var testDir: File = _ http://git-wip-us.apache.org/repos/asf/spark/blob/9d4dd799/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala -- diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index 561bca5..47bc8fe 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -41,6 +41,10 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private val sparkConf = new SparkConf().setMaster("local[4]") .setAppName(this.getClass.getSimpleName) +// Set a timeout of 10 seconds that's going to be used to fetch topics/partitions from kafka. +// Othewise the poll timeout defaults to 2 minutes and causes test cases to run longer. +.set("spark.streaming.kafka.consumer.poll.ms", "1") + private var sc: SparkContext = _ override def beforeAll { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25394][CORE] Add an application status metrics source
Repository: spark Updated Branches: refs/heads/master 703e6da1e -> bd2c44713 [SPARK-25394][CORE] Add an application status metrics source - Exposes several metrics regarding application status as a source, useful to scrape them via jmx instead of mining the metrics rest api. Example use case: prometheus + jmx exporter. - Metrics are gathered when a job ends at the AppStatusListener side, could be more fine-grained but most metrics like tasks completed are also counted by executors. More metrics could be exposed in the future to avoid scraping executors in some scenarios. - a config option `spark.app.status.metrics.enabled` is added to disable/enable these metrics, by default they are disabled. This was manually tested with jmx source enabled and prometheus server on k8s: ![metrics](https://user-images.githubusercontent.com/7945591/45300945-63064d00-b518-11e8-812a-d9b4155ba0c0.png) In the next pic the job delay is shown for repeated pi calculation (Spark action). ![pi](https://user-images.githubusercontent.com/7945591/45329927-89a1a380-b56b-11e8-9cc1-5e76cb83969f.png) Closes #22381 from skonto/add_app_status_metrics. Authored-by: Stavros Kontopoulos Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd2c4471 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd2c4471 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd2c4471 Branch: refs/heads/master Commit: bd2c4471311cd7e948c80b4927a903636ce0ce7e Parents: 703e6da Author: Stavros Kontopoulos Authored: Tue Oct 16 14:57:32 2018 -0700 Committer: Marcelo Vanzin Committed: Tue Oct 16 14:58:26 2018 -0700 -- .../scala/org/apache/spark/SparkContext.scala | 7 +- .../apache/spark/status/AppStatusListener.scala | 33 +++- .../apache/spark/status/AppStatusSource.scala | 85 .../apache/spark/status/AppStatusStore.scala| 7 +- .../org/apache/spark/status/LiveEntity.scala| 2 +- 5 files changed, 125 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bd2c4471/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0a66dae..10f3168 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -53,7 +53,7 @@ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} import org.apache.spark.scheduler.local.LocalSchedulerBackend -import org.apache.spark.status.AppStatusStore +import org.apache.spark.status.{AppStatusSource, AppStatusStore} import org.apache.spark.status.api.v1.ThreadStackTrace import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump @@ -418,7 +418,8 @@ class SparkContext(config: SparkConf) extends Logging { // Initialize the app status store and listener before SparkEnv is created so that it gets // all events. -_statusStore = AppStatusStore.createLiveStore(conf) +val appStatusSource = AppStatusSource.createSource(conf) +_statusStore = AppStatusStore.createLiveStore(conf, appStatusSource) listenerBus.addToStatusQueue(_statusStore.listener.get) // Create the Spark execution environment (cache, map output tracker, etc) @@ -569,7 +570,7 @@ class SparkContext(config: SparkConf) extends Logging { _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) } - +appStatusSource.foreach(_env.metricsSystem.registerSource(_)) // Make sure the context is stopped if the user forgets about it. This avoids leaving // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM // is killed, though. http://git-wip-us.apache.org/repos/asf/spark/blob/bd2c4471/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala -- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 36aaf67..d52b7e8 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -44,6 +44,7 @@ private[spark] class AppStatusListener( kvstore: ElementTrackingStore, conf: SparkConf, live: Boolean, +appStatusSource: Option[AppStatusSource] = None, lastUpdateTime: Option[Long] = No
svn commit: r30094 - in /dev/spark/3.0.0-SNAPSHOT-2018_10_16_08_02-703e6da-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Oct 16 15:20:45 2018 New Revision: 30094 Log: Apache Spark 3.0.0-SNAPSHOT-2018_10_16_08_02-703e6da docs [This commit notification would consist of 1478 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/4] spark git commit: [SPARK-25705][BUILD][STREAMING][TEST-MAVEN] Remove Kafka 0.8 integration
Repository: spark Updated Branches: refs/heads/master 2c664edc0 -> 703e6da1e http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/python/pyspark/streaming/kafka.py -- diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py deleted file mode 100644 index ed2e0e7..000 --- a/python/pyspark/streaming/kafka.py +++ /dev/null @@ -1,506 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import warnings - -from py4j.protocol import Py4JJavaError - -from pyspark.rdd import RDD -from pyspark.storagelevel import StorageLevel -from pyspark.serializers import AutoBatchedSerializer, PickleSerializer, PairDeserializer, \ -NoOpSerializer -from pyspark.streaming import DStream -from pyspark.streaming.dstream import TransformedDStream -from pyspark.streaming.util import TransformFunction - -__all__ = ['Broker', 'KafkaMessageAndMetadata', 'KafkaUtils', 'OffsetRange', - 'TopicAndPartition', 'utf8_decoder'] - - -def utf8_decoder(s): -""" Decode the unicode as UTF-8 """ -if s is None: -return None -return s.decode('utf-8') - - -class KafkaUtils(object): - -@staticmethod -def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None, - storageLevel=StorageLevel.MEMORY_AND_DISK_2, - keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): -""" -Create an input stream that pulls messages from a Kafka Broker. - -:param ssc: StreamingContext object -:param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..). -:param groupId: The group id for this consumer. -:param topics: Dict of (topic_name -> numPartitions) to consume. -Each partition is consumed in its own thread. -:param kafkaParams: Additional params for Kafka -:param storageLevel: RDD storage level. -:param keyDecoder: A function used to decode key (default is utf8_decoder) -:param valueDecoder: A function used to decode value (default is utf8_decoder) -:return: A DStream object - -.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. -See SPARK-21893. -""" -warnings.warn( -"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. " -"See SPARK-21893.", -DeprecationWarning) -if kafkaParams is None: -kafkaParams = dict() -kafkaParams.update({ -"zookeeper.connect": zkQuorum, -"group.id": groupId, -"zookeeper.connection.timeout.ms": "1", -}) -if not isinstance(topics, dict): -raise TypeError("topics should be dict") -jlevel = ssc._sc._getJavaStorageLevel(storageLevel) -helper = KafkaUtils._get_helper(ssc._sc) -jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel) -ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) -stream = DStream(jstream, ssc, ser) -return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1]))) - -@staticmethod -def createDirectStream(ssc, topics, kafkaParams, fromOffsets=None, - keyDecoder=utf8_decoder, valueDecoder=utf8_decoder, - messageHandler=None): -""" -Create an input stream that directly pulls messages from a Kafka Broker and specific offset. - -This is not a receiver based Kafka input stream, it directly pulls the message from Kafka -in each batch duration and processed without storing. - -This does not use Zookeeper to store offsets. The consumed offsets are tracked -by the stream itself. For interoperability with Kafka monitoring tools that depend on -Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. -You can access the offsets used in each batch from the generated RDDs (see - -To recover from driver failures, you have to enable checkpointing in the StreamingContext. -The information on consumed offset can be recovere
[2/4] spark git commit: [SPARK-25705][BUILD][STREAMING][TEST-MAVEN] Remove Kafka 0.8 integration
http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala -- diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala deleted file mode 100644 index 39abe3c..000 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import java.util.Properties -import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor} - -import scala.collection.{mutable, Map} -import scala.reflect.{classTag, ClassTag} - -import kafka.common.TopicAndPartition -import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream} -import kafka.message.MessageAndMetadata -import kafka.serializer.Decoder -import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils} -import org.I0Itec.zkclient.ZkClient - -import org.apache.spark.SparkEnv -import org.apache.spark.internal.Logging -import org.apache.spark.storage.{StorageLevel, StreamBlockId} -import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver} -import org.apache.spark.util.ThreadUtils - -/** - * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss. - * It is turned off by default and will be enabled when - * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver - * is that this receiver manages topic-partition/offset itself and updates the offset information - * after data is reliably stored as write-ahead log. Offsets will only be updated when data is - * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated. - * - * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset - * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams - * will not take effect. - */ -private[streaming] -class ReliableKafkaReceiver[ - K: ClassTag, - V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag]( -kafkaParams: Map[String, String], -topics: Map[String, Int], -storageLevel: StorageLevel) -extends Receiver[(K, V)](storageLevel) with Logging { - - private val groupId = kafkaParams("group.id") - private val AUTO_OFFSET_COMMIT = "auto.commit.enable" - private def conf = SparkEnv.get.conf - - /** High level consumer to connect to Kafka. */ - private var consumerConnector: ConsumerConnector = null - - /** zkClient to connect to Zookeeper to commit the offsets. */ - private var zkClient: ZkClient = null - - /** - * A HashMap to manage the offset for each topic/partition, this HashMap is called in - * synchronized block, so mutable HashMap will not meet concurrency issue. - */ - private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null - - /** A concurrent HashMap to store the stream block id and related offset snapshot. */ - private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null - - /** - * Manage the BlockGenerator in receiver itself for better managing block store and offset - * commit. - */ - private var blockGenerator: BlockGenerator = null - - /** Thread pool running the handlers for receiving message from multiple topics and partitions. */ - private var messageHandlerThreadPool: ThreadPoolExecutor = null - - override def onStart(): Unit = { -logInfo(s"Starting Kafka Consumer Stream with group: $groupId") - -// Initialize the topic-partition / offset hash map. -topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long] - -// Initialize the stream block id / offset snapshot hash map. -blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]() - -// Initialize the block generator for stor
[3/4] spark git commit: [SPARK-25705][BUILD][STREAMING][TEST-MAVEN] Remove Kafka 0.8 integration
http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala -- diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala deleted file mode 100644 index 570affa..000 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala +++ /dev/null @@ -1,439 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import java.util.Properties - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer -import scala.util.Random -import scala.util.control.NonFatal - -import kafka.api._ -import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition} -import kafka.consumer.{ConsumerConfig, SimpleConsumer} - -import org.apache.spark.SparkException -import org.apache.spark.annotation.DeveloperApi - -/** - * :: DeveloperApi :: - * Convenience methods for interacting with a Kafka cluster. - * See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol";> - * A Guide To The Kafka Protocol for more details on individual api calls. - * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration";> - * configuration parameters. - * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form - */ -@DeveloperApi -@deprecated("Update to Kafka 0.10 integration", "2.3.0") -class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { - import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig} - - // ConsumerConfig isn't serializable - @transient private var _config: SimpleConsumerConfig = null - - def config: SimpleConsumerConfig = this.synchronized { -if (_config == null) { - _config = SimpleConsumerConfig(kafkaParams) -} -_config - } - - def connect(host: String, port: Int): SimpleConsumer = -new SimpleConsumer(host, port, config.socketTimeoutMs, - config.socketReceiveBufferBytes, config.clientId) - - def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = -findLeader(topic, partition).right.map(hp => connect(hp._1, hp._2)) - - // Metadata api - // scalastyle:off - // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI - // scalastyle:on - - def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { -val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, - 0, config.clientId, Seq(topic)) -val errs = new Err -withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => - val resp: TopicMetadataResponse = consumer.send(req) - resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata => -tm.partitionsMetadata.find(_.partitionId == partition) - }.foreach { pm: PartitionMetadata => -pm.leader.foreach { leader => - return Right((leader.host, leader.port)) -} - } -} -Left(errs) - } - - def findLeaders( - topicAndPartitions: Set[TopicAndPartition] -): Either[Err, Map[TopicAndPartition, (String, Int)]] = { -val topics = topicAndPartitions.map(_.topic) -val response = getPartitionMetadata(topics).right -val answer = response.flatMap { tms: Set[TopicMetadata] => - val leaderMap = tms.flatMap { tm: TopicMetadata => -tm.partitionsMetadata.flatMap { pm: PartitionMetadata => - val tp = TopicAndPartition(tm.topic, pm.partitionId) - if (topicAndPartitions(tp)) { -pm.leader.map { l => - tp -> (l.host -> l.port) -} - } else { -None - } -} - }.toMap - - if (leaderMap.keys.size == topicAndPartitions.size) { -Right(leaderMap) - } else { -val missing = topicAndPartitions.diff(leaderMap.keySet) -
[4/4] spark git commit: [SPARK-25705][BUILD][STREAMING][TEST-MAVEN] Remove Kafka 0.8 integration
[SPARK-25705][BUILD][STREAMING][TEST-MAVEN] Remove Kafka 0.8 integration ## What changes were proposed in this pull request? Remove Kafka 0.8 integration ## How was this patch tested? Existing tests, build scripts Closes #22703 from srowen/SPARK-25705. Authored-by: Sean Owen Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/703e6da1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/703e6da1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/703e6da1 Branch: refs/heads/master Commit: 703e6da1ecb52ab5b8f42b3b4cac39f27caa51d8 Parents: 2c664ed Author: Sean Owen Authored: Tue Oct 16 09:10:24 2018 -0500 Committer: Sean Owen Committed: Tue Oct 16 09:10:24 2018 -0500 -- .../org/apache/spark/deploy/SparkSubmit.scala | 6 +- dev/create-release/release-build.sh | 4 +- dev/mima| 2 +- dev/run-tests.py| 1 - dev/sbt-checkstyle | 1 - dev/scalastyle | 1 - dev/sparktestsupport/modules.py | 22 - dev/test-dependencies.sh| 2 +- docs/building-spark.md | 9 - docs/configuration.md | 12 +- docs/streaming-kafka-0-10-integration.md| 5 +- docs/streaming-kafka-0-8-integration.md | 196 - docs/streaming-kafka-integration.md | 53 +- docs/streaming-programming-guide.md | 8 +- docs/structured-streaming-programming-guide.md | 6 +- .../python/streaming/direct_kafka_wordcount.py | 56 -- .../main/python/streaming/kafka_wordcount.py| 56 -- .../streaming/kafka010/ConsumerStrategy.scala | 35 +- .../spark/streaming/kafka010/KafkaUtils.scala | 15 - .../streaming/kafka010/LocationStrategy.scala | 16 +- .../spark/streaming/kafka010/OffsetRange.scala | 8 - .../streaming/kafka010/PerPartitionConfig.scala | 3 - external/kafka-0-8-assembly/pom.xml | 170 external/kafka-0-8/pom.xml | 109 --- .../apache/spark/streaming/kafka/Broker.scala | 68 -- .../kafka/DirectKafkaInputDStream.scala | 233 -- .../spark/streaming/kafka/KafkaCluster.scala| 439 -- .../streaming/kafka/KafkaInputDStream.scala | 142 .../apache/spark/streaming/kafka/KafkaRDD.scala | 273 --- .../streaming/kafka/KafkaRDDPartition.scala | 42 - .../spark/streaming/kafka/KafkaTestUtils.scala | 299 --- .../spark/streaming/kafka/KafkaUtils.scala | 806 --- .../spark/streaming/kafka/OffsetRange.scala | 112 --- .../streaming/kafka/ReliableKafkaReceiver.scala | 302 --- .../spark/streaming/kafka/package-info.java | 21 - .../apache/spark/streaming/kafka/package.scala | 23 - .../kafka/JavaDirectKafkaStreamSuite.java | 170 .../streaming/kafka/JavaKafkaRDDSuite.java | 156 .../streaming/kafka/JavaKafkaStreamSuite.java | 144 .../src/test/resources/log4j.properties | 28 - .../kafka/DirectKafkaStreamSuite.scala | 636 --- .../streaming/kafka/KafkaClusterSuite.scala | 86 -- .../spark/streaming/kafka/KafkaRDDSuite.scala | 182 - .../streaming/kafka/KafkaStreamSuite.scala | 92 --- .../kafka/ReliableKafkaStreamSuite.scala| 153 pom.xml | 8 - project/SparkBuild.scala| 12 +- python/docs/pyspark.streaming.rst | 7 - python/pyspark/streaming/dstream.py | 3 +- python/pyspark/streaming/kafka.py | 506 python/pyspark/streaming/tests.py | 287 +-- 51 files changed, 39 insertions(+), 5987 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 13fa6d0..88df732 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -992,9 +992,9 @@ private[spark] object SparkSubmitUtils { // Exposed for testing. // These components are used to make the default exclusion rules for Spark dependencies. - // We need to specify each component explicitly, otherwise we miss spark-streaming-kafka-0-8 and - // other spark-streaming utility components. Underscore is there to differentiate between - // spark-streaming_2.1x and spark-streaming-kafka-0-8-assembly_2.1x + // We need to specify each
svn commit: r30091 - in /dev/spark/2.4.1-SNAPSHOT-2018_10_16_06_02-144cb94-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Oct 16 13:20:30 2018 New Revision: 30091 Log: Apache Spark 2.4.1-SNAPSHOT-2018_10_16_06_02-144cb94 docs [This commit notification would consist of 1472 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25579][SQL] Use quoted attribute names if needed in pushed ORC predicates
Repository: spark Updated Branches: refs/heads/branch-2.4 77156f8c8 -> 144cb949d [SPARK-25579][SQL] Use quoted attribute names if needed in pushed ORC predicates ## What changes were proposed in this pull request? This PR aims to fix an ORC performance regression at Spark 2.4.0 RCs from Spark 2.3.2. Currently, for column names with `.`, the pushed predicates are ignored. **Test Data** ```scala scala> val df = spark.range(Int.MaxValue).sample(0.2).toDF("col.with.dot") scala> df.write.mode("overwrite").orc("/tmp/orc") ``` **Spark 2.3.2** ```scala scala> spark.sql("set spark.sql.orc.impl=native") scala> spark.sql("set spark.sql.orc.filterPushdown=true") scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show) ++ |col.with.dot| ++ | 5| | 7| | 8| ++ Time taken: 1542 ms scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show) ++ |col.with.dot| ++ | 5| | 7| | 8| ++ Time taken: 152 ms ``` **Spark 2.4.0 RC3** ```scala scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show) ++ |col.with.dot| ++ | 5| | 7| | 8| ++ Time taken: 4074 ms scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show) ++ |col.with.dot| ++ | 5| | 7| | 8| ++ Time taken: 1771 ms ``` ## How was this patch tested? Pass the Jenkins with a newly added test case. Closes #22597 from dongjoon-hyun/SPARK-25579. Authored-by: Dongjoon Hyun Signed-off-by: hyukjinkwon (cherry picked from commit 2c664edc060a41340eb374fd44b5d32c3c06a15c) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/144cb949 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/144cb949 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/144cb949 Branch: refs/heads/branch-2.4 Commit: 144cb949d597e6cd0e662f2320e983cb6903ecfb Parents: 77156f8 Author: Dongjoon Hyun Authored: Tue Oct 16 20:30:23 2018 +0800 Committer: hyukjinkwon Committed: Tue Oct 16 20:30:40 2018 +0800 -- .../execution/datasources/orc/OrcFilters.scala | 37 +++- .../datasources/orc/OrcQuerySuite.scala | 28 +-- .../sql/execution/datasources/orc/OrcTest.scala | 10 ++ 3 files changed, 46 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/144cb949/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index dbafc46..5b93a60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -67,6 +67,16 @@ private[sql] object OrcFilters { } } + // Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters + // in order to distinguish predicate pushdown for nested columns. + private def quoteAttributeNameIfNeeded(name: String) : String = { +if (!name.contains("`") && name.contains(".")) { + s"`$name`" +} else { + name +} + } + /** * Create ORC filter as a SearchArgument instance. */ @@ -178,38 +188,47 @@ private[sql] object OrcFilters { // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +val quotedName = quoteAttributeNameIfNeeded(attribute) val castedValue = castLiteralValue(value, dataTypeMap(attribute)) -Some(builder.startAnd().equals(attribute, getType(attribute), castedValue).end()) +Some(builder.startAnd().equals(quotedName, getType(attribute), castedValue).end()) case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +val quotedName = quoteAttributeNameIfNeeded(attribute) val castedValue = castLiteralValue(value, dataTypeMap(attribute)) -Some(builder.startAnd().nullSafeEquals(attribute, getType(attribute), castedValue).end()) +Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end()) case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +val quotedName = quoteAttributeNameIfNeeded(attribute) val castedValue = castLiteralValue(value, dataTypeMap(attribute)
spark git commit: [SPARK-25579][SQL] Use quoted attribute names if needed in pushed ORC predicates
Repository: spark Updated Branches: refs/heads/master e028fd3ae -> 2c664edc0 [SPARK-25579][SQL] Use quoted attribute names if needed in pushed ORC predicates ## What changes were proposed in this pull request? This PR aims to fix an ORC performance regression at Spark 2.4.0 RCs from Spark 2.3.2. Currently, for column names with `.`, the pushed predicates are ignored. **Test Data** ```scala scala> val df = spark.range(Int.MaxValue).sample(0.2).toDF("col.with.dot") scala> df.write.mode("overwrite").orc("/tmp/orc") ``` **Spark 2.3.2** ```scala scala> spark.sql("set spark.sql.orc.impl=native") scala> spark.sql("set spark.sql.orc.filterPushdown=true") scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show) ++ |col.with.dot| ++ | 5| | 7| | 8| ++ Time taken: 1542 ms scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show) ++ |col.with.dot| ++ | 5| | 7| | 8| ++ Time taken: 152 ms ``` **Spark 2.4.0 RC3** ```scala scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show) ++ |col.with.dot| ++ | 5| | 7| | 8| ++ Time taken: 4074 ms scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show) ++ |col.with.dot| ++ | 5| | 7| | 8| ++ Time taken: 1771 ms ``` ## How was this patch tested? Pass the Jenkins with a newly added test case. Closes #22597 from dongjoon-hyun/SPARK-25579. Authored-by: Dongjoon Hyun Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c664edc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c664edc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c664edc Branch: refs/heads/master Commit: 2c664edc060a41340eb374fd44b5d32c3c06a15c Parents: e028fd3 Author: Dongjoon Hyun Authored: Tue Oct 16 20:30:23 2018 +0800 Committer: hyukjinkwon Committed: Tue Oct 16 20:30:23 2018 +0800 -- .../execution/datasources/orc/OrcFilters.scala | 37 +++- .../datasources/orc/OrcQuerySuite.scala | 28 +-- .../sql/execution/datasources/orc/OrcTest.scala | 10 ++ 3 files changed, 46 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2c664edc/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 2b17b47..0a64981 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -67,6 +67,16 @@ private[sql] object OrcFilters { } } + // Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters + // in order to distinguish predicate pushdown for nested columns. + private def quoteAttributeNameIfNeeded(name: String) : String = { +if (!name.contains("`") && name.contains(".")) { + s"`$name`" +} else { + name +} + } + /** * Create ORC filter as a SearchArgument instance. */ @@ -215,38 +225,47 @@ private[sql] object OrcFilters { // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +val quotedName = quoteAttributeNameIfNeeded(attribute) val castedValue = castLiteralValue(value, dataTypeMap(attribute)) -Some(builder.startAnd().equals(attribute, getType(attribute), castedValue).end()) +Some(builder.startAnd().equals(quotedName, getType(attribute), castedValue).end()) case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +val quotedName = quoteAttributeNameIfNeeded(attribute) val castedValue = castLiteralValue(value, dataTypeMap(attribute)) -Some(builder.startAnd().nullSafeEquals(attribute, getType(attribute), castedValue).end()) +Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end()) case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +val quotedName = quoteAttributeNameIfNeeded(attribute) val castedValue = castLiteralValue(value, dataTypeMap(attribute)) -Some(builder.startAnd().lessThan(attribute, getType(attribute), castedValue).end()) +
svn commit: r30090 - in /dev/spark/3.0.0-SNAPSHOT-2018_10_16_04_02-e028fd3-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Oct 16 11:20:45 2018 New Revision: 30090 Log: Apache Spark 3.0.0-SNAPSHOT-2018_10_16_04_02-e028fd3 docs [This commit notification would consist of 1481 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r30089 - in /dev/spark/2.4.1-SNAPSHOT-2018_10_16_02_02-77156f8-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Oct 16 09:22:20 2018 New Revision: 30089 Log: Apache Spark 2.4.1-SNAPSHOT-2018_10_16_02_02-77156f8 docs [This commit notification would consist of 1472 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r30086 - in /dev/spark/3.0.0-SNAPSHOT-2018_10_16_00_02-5c7f6b6-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Oct 16 07:16:59 2018 New Revision: 30086 Log: Apache Spark 3.0.0-SNAPSHOT-2018_10_16_00_02-5c7f6b6 docs [This commit notification would consist of 1481 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25736][SQL][TEST] add tests to verify the behavior of multi-column count
Repository: spark Updated Branches: refs/heads/master 5c7f6b663 -> e028fd3ae [SPARK-25736][SQL][TEST] add tests to verify the behavior of multi-column count ## What changes were proposed in this pull request? AFAIK multi-column count is not widely supported by the mainstream databases(postgres doesn't support), and the SQL standard doesn't define it clearly, as near as I can tell. Since Spark supports it, we should clearly document the current behavior and add tests to verify it. ## How was this patch tested? N/A Closes #22728 from cloud-fan/doc. Authored-by: Wenchen Fan Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e028fd3a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e028fd3a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e028fd3a Branch: refs/heads/master Commit: e028fd3aed9e5e4c478f307f0a467b54b73ff0d5 Parents: 5c7f6b6 Author: Wenchen Fan Authored: Tue Oct 16 15:13:01 2018 +0800 Committer: hyukjinkwon Committed: Tue Oct 16 15:13:01 2018 +0800 -- .../catalyst/expressions/aggregate/Count.scala | 2 +- .../test/resources/sql-tests/inputs/count.sql | 27 ++ .../resources/sql-tests/results/count.sql.out | 55 3 files changed, 83 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e028fd3a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala index 40582d0..8cab8e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala @@ -52,7 +52,7 @@ abstract class CountLike extends DeclarativeAggregate { usage = """ _FUNC_(*) - Returns the total number of retrieved rows, including rows containing null. -_FUNC_(expr) - Returns the number of rows for which the supplied expression is non-null. +_FUNC_(expr[, expr...]) - Returns the number of rows for which the supplied expression(s) are all non-null. _FUNC_(DISTINCT expr[, expr...]) - Returns the number of rows for which the supplied expression(s) are unique and non-null. """) http://git-wip-us.apache.org/repos/asf/spark/blob/e028fd3a/sql/core/src/test/resources/sql-tests/inputs/count.sql -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/count.sql b/sql/core/src/test/resources/sql-tests/inputs/count.sql new file mode 100644 index 000..9f9ee4a --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/count.sql @@ -0,0 +1,27 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES +(1, 1), (1, 2), (2, 1), (1, 1), (null, 2), (1, null), (null, null) +AS testData(a, b); + +-- count with single expression +SELECT + count(*), count(1), count(null), count(a), count(b), count(a + b), count((a, b)) +FROM testData; + +-- distinct count with single expression +SELECT + count(DISTINCT 1), + count(DISTINCT null), + count(DISTINCT a), + count(DISTINCT b), + count(DISTINCT (a + b)), + count(DISTINCT (a, b)) +FROM testData; + +-- count with multiple expressions +SELECT count(a, b), count(b, a), count(testData.*) FROM testData; + +-- distinct count with multiple expressions +SELECT + count(DISTINCT a, b), count(DISTINCT b, a), count(DISTINCT *), count(DISTINCT testData.*) +FROM testData; http://git-wip-us.apache.org/repos/asf/spark/blob/e028fd3a/sql/core/src/test/resources/sql-tests/results/count.sql.out -- diff --git a/sql/core/src/test/resources/sql-tests/results/count.sql.out b/sql/core/src/test/resources/sql-tests/results/count.sql.out new file mode 100644 index 000..b8a86d4 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/count.sql.out @@ -0,0 +1,55 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 5 + + +-- !query 0 +CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES +(1, 1), (1, 2), (2, 1), (1, 1), (null, 2), (1, null), (null, null) +AS testData(a, b) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +SELECT + count(*), count(1), count(null), count(a), count(b), count(a + b), count((a, b)) +FROM testData +-- !query 1 schema +struct +-- !query 1 output +7 7 0 5 5 4 7 + + +-- !query 2 +SELECT + count(DISTINCT 1), + count(DISTINCT null), + count(DISTI
spark git commit: [SPARK-25736][SQL][TEST] add tests to verify the behavior of multi-column count
Repository: spark Updated Branches: refs/heads/branch-2.4 8bc7ab03d -> 77156f8c8 [SPARK-25736][SQL][TEST] add tests to verify the behavior of multi-column count ## What changes were proposed in this pull request? AFAIK multi-column count is not widely supported by the mainstream databases(postgres doesn't support), and the SQL standard doesn't define it clearly, as near as I can tell. Since Spark supports it, we should clearly document the current behavior and add tests to verify it. ## How was this patch tested? N/A Closes #22728 from cloud-fan/doc. Authored-by: Wenchen Fan Signed-off-by: hyukjinkwon (cherry picked from commit e028fd3aed9e5e4c478f307f0a467b54b73ff0d5) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77156f8c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77156f8c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77156f8c Branch: refs/heads/branch-2.4 Commit: 77156f8c81720ec7364b386a95ef1b30713fe55c Parents: 8bc7ab0 Author: Wenchen Fan Authored: Tue Oct 16 15:13:01 2018 +0800 Committer: hyukjinkwon Committed: Tue Oct 16 15:13:19 2018 +0800 -- .../catalyst/expressions/aggregate/Count.scala | 2 +- .../test/resources/sql-tests/inputs/count.sql | 27 ++ .../resources/sql-tests/results/count.sql.out | 55 3 files changed, 83 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/77156f8c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala index 40582d0..8cab8e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala @@ -52,7 +52,7 @@ abstract class CountLike extends DeclarativeAggregate { usage = """ _FUNC_(*) - Returns the total number of retrieved rows, including rows containing null. -_FUNC_(expr) - Returns the number of rows for which the supplied expression is non-null. +_FUNC_(expr[, expr...]) - Returns the number of rows for which the supplied expression(s) are all non-null. _FUNC_(DISTINCT expr[, expr...]) - Returns the number of rows for which the supplied expression(s) are unique and non-null. """) http://git-wip-us.apache.org/repos/asf/spark/blob/77156f8c/sql/core/src/test/resources/sql-tests/inputs/count.sql -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/count.sql b/sql/core/src/test/resources/sql-tests/inputs/count.sql new file mode 100644 index 000..9f9ee4a --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/count.sql @@ -0,0 +1,27 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES +(1, 1), (1, 2), (2, 1), (1, 1), (null, 2), (1, null), (null, null) +AS testData(a, b); + +-- count with single expression +SELECT + count(*), count(1), count(null), count(a), count(b), count(a + b), count((a, b)) +FROM testData; + +-- distinct count with single expression +SELECT + count(DISTINCT 1), + count(DISTINCT null), + count(DISTINCT a), + count(DISTINCT b), + count(DISTINCT (a + b)), + count(DISTINCT (a, b)) +FROM testData; + +-- count with multiple expressions +SELECT count(a, b), count(b, a), count(testData.*) FROM testData; + +-- distinct count with multiple expressions +SELECT + count(DISTINCT a, b), count(DISTINCT b, a), count(DISTINCT *), count(DISTINCT testData.*) +FROM testData; http://git-wip-us.apache.org/repos/asf/spark/blob/77156f8c/sql/core/src/test/resources/sql-tests/results/count.sql.out -- diff --git a/sql/core/src/test/resources/sql-tests/results/count.sql.out b/sql/core/src/test/resources/sql-tests/results/count.sql.out new file mode 100644 index 000..b8a86d4 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/count.sql.out @@ -0,0 +1,55 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 5 + + +-- !query 0 +CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES +(1, 1), (1, 2), (2, 1), (1, 1), (null, 2), (1, null), (null, null) +AS testData(a, b) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +SELECT + count(*), count(1), count(null), count(a), count(b), count(a + b), count((a, b)) +FROM testData +-- !query 1 schema +struct +-- !query 1 output +7 7 0 5