spark git commit: [SPARK-18175][SQL] Improve the test case coverage of implicit type casting
Repository: spark Updated Branches: refs/heads/master 7eb2ca8e3 -> 9ddec8636 [SPARK-18175][SQL] Improve the test case coverage of implicit type casting ### What changes were proposed in this pull request? So far, we have limited test case coverage about implicit type casting. We need to draw a matrix to find all the possible casting pairs. - Reorged the existing test cases - Added all the possible type casting pairs - Drawed a matrix to show the implicit type casting. The table is very wide. Maybe hard to review. Thus, you also can access the same table via the link to [a google sheet](https://docs.google.com/spreadsheets/d/19PS4ikrs-Yye_mfu-rmIKYGnNe-NmOTt5DDT1fOD3pI/edit?usp=sharing). SourceType\CastToType | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | BinaryType | BooleanType | StringType | DateType | TimestampType | ArrayType | MapType | StructType | NullType | CalendarIntervalType | DecimalType | NumericType | IntegralType | | | | | | | | | | | | | | | | | | | | --- **ByteType** | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | X| X| StringType | X| X| X| X| X| X| X| DecimalType(3, 0) | ByteType | ByteType **ShortType** | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | X| X| StringType | X| X| X| X| X| X| X| DecimalType(5, 0) | ShortType | ShortType **IntegerType** | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | X| X| StringType | X| X| X| X| X| X| X| DecimalType(10, 0) | IntegerType | IntegerType **LongType** | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | X| X| StringType | X| X| X| X| X| X| X| DecimalType(20, 0) | LongType | LongType **DoubleType** | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | X| X| StringType | X| X| X| X| X| X| X| DecimalType(30, 15) | DoubleType | IntegerType **FloatType** | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | X| X| StringType | X| X| X| X| X| X| X| DecimalType(14, 7) | FloatType | IntegerType **Dec(10, 2)** | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | X| X| StringType | X| X| X| X| X| X| X| DecimalType(10, 2) | Dec(10, 2) | IntegerType **BinaryType** | X| X| X| X| X| X| X| BinaryType | X| StringType | X| X| X| X| X| X| X| X| X | X **BooleanType** | X| X| X| X| X| X| X| X| BooleanType | StringType | X| X| X| X| X| X| X| X | X| X **StringType** | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | BinaryType | X| StringType | DateType | TimestampType | X| X| X| X| X| DecimalType(38, 18) | DoubleType | X **DateType** | X| X| X| X| X| X| X| X| X| StringType | DateType | TimestampType | X| X| X| X| X| X | X| X **TimestampType** | X| X| X| X| X| X| X| X| X | StringType | DateType | TimestampType | X| X| X| X| X| X | X| X **ArrayType** | X| X| X| X| X| X| X| X| X| X| X| X| ArrayType* | X| X| X| X| X| X| X **MapType** | X| X| X| X| X| X| X| X| X| X | X| X| X| MapType* | X| X| X| X| X| X **StructType** | X| X| X| X| X| X| X| X| X| X| X| X| X| X| StructType* | X| X| X| X| X **NullType** | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | BinaryType | BooleanType | StringType | DateType | TimestampType | ArrayType | MapType | StructType | NullType | CalendarIntervalType | DecimalType(38, 18) | DoubleType | IntegerType **CalendarIntervalType** | X| X| X| X| X| X| X| X | X| X| X| X| X| X| X| X| CalendarIntervalType | X| X| X Note: ArrayType\*, MapType\*, StructType\* are castable only when the internal child types also match; otherwise, not castable ### How was this patch tested? N/A Author: gatorsmileCloses #15691 from
spark git commit: [SPARK-18175][SQL] Improve the test case coverage of implicit type casting
Repository: spark Updated Branches: refs/heads/branch-2.1 1e29f0a0d -> 2cf39d638 [SPARK-18175][SQL] Improve the test case coverage of implicit type casting ### What changes were proposed in this pull request? So far, we have limited test case coverage about implicit type casting. We need to draw a matrix to find all the possible casting pairs. - Reorged the existing test cases - Added all the possible type casting pairs - Drawed a matrix to show the implicit type casting. The table is very wide. Maybe hard to review. Thus, you also can access the same table via the link to [a google sheet](https://docs.google.com/spreadsheets/d/19PS4ikrs-Yye_mfu-rmIKYGnNe-NmOTt5DDT1fOD3pI/edit?usp=sharing). SourceType\CastToType | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | BinaryType | BooleanType | StringType | DateType | TimestampType | ArrayType | MapType | StructType | NullType | CalendarIntervalType | DecimalType | NumericType | IntegralType | | | | | | | | | | | | | | | | | | | | --- **ByteType** | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | X| X| StringType | X| X| X| X| X| X| X| DecimalType(3, 0) | ByteType | ByteType **ShortType** | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | X| X| StringType | X| X| X| X| X| X| X| DecimalType(5, 0) | ShortType | ShortType **IntegerType** | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | X| X| StringType | X| X| X| X| X| X| X| DecimalType(10, 0) | IntegerType | IntegerType **LongType** | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | X| X| StringType | X| X| X| X| X| X| X| DecimalType(20, 0) | LongType | LongType **DoubleType** | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | X| X| StringType | X| X| X| X| X| X| X| DecimalType(30, 15) | DoubleType | IntegerType **FloatType** | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | X| X| StringType | X| X| X| X| X| X| X| DecimalType(14, 7) | FloatType | IntegerType **Dec(10, 2)** | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | X| X| StringType | X| X| X| X| X| X| X| DecimalType(10, 2) | Dec(10, 2) | IntegerType **BinaryType** | X| X| X| X| X| X| X| BinaryType | X| StringType | X| X| X| X| X| X| X| X| X | X **BooleanType** | X| X| X| X| X| X| X| X| BooleanType | StringType | X| X| X| X| X| X| X| X | X| X **StringType** | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | BinaryType | X| StringType | DateType | TimestampType | X| X| X| X| X| DecimalType(38, 18) | DoubleType | X **DateType** | X| X| X| X| X| X| X| X| X| StringType | DateType | TimestampType | X| X| X| X| X| X | X| X **TimestampType** | X| X| X| X| X| X| X| X| X | StringType | DateType | TimestampType | X| X| X| X| X| X | X| X **ArrayType** | X| X| X| X| X| X| X| X| X| X| X| X| ArrayType* | X| X| X| X| X| X| X **MapType** | X| X| X| X| X| X| X| X| X| X | X| X| X| MapType* | X| X| X| X| X| X **StructType** | X| X| X| X| X| X| X| X| X| X| X| X| X| X| StructType* | X| X| X| X| X **NullType** | ByteType | ShortType | IntegerType | LongType | DoubleType | FloatType | Dec(10, 2) | BinaryType | BooleanType | StringType | DateType | TimestampType | ArrayType | MapType | StructType | NullType | CalendarIntervalType | DecimalType(38, 18) | DoubleType | IntegerType **CalendarIntervalType** | X| X| X| X| X| X| X| X | X| X| X| X| X| X| X| X| CalendarIntervalType | X| X| X Note: ArrayType\*, MapType\*, StructType\* are castable only when the internal child types also match; otherwise, not castable ### How was this patch tested? N/A Author: gatorsmileCloses #15691 from
[2/3] spark git commit: [SPARK-17963][SQL][DOCUMENTATION] Add examples (extend) in each expression and improve documentation
http://git-wip-us.apache.org/repos/asf/spark/blob/1e29f0a0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala index 5152265..a60494a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala @@ -139,8 +139,12 @@ abstract class BinaryMathExpression(f: (Double, Double) => Double, name: String) * evaluated by the optimizer during constant folding. */ @ExpressionDescription( - usage = "_FUNC_() - Returns Euler's number, E.", - extended = "> SELECT _FUNC_();\n 2.718281828459045") + usage = "_FUNC_() - Returns Euler's number, e.", + extended = """ +Examples: + > SELECT _FUNC_(); + 2.718281828459045 + """) case class EulerNumber() extends LeafMathExpression(math.E, "E") /** @@ -148,8 +152,12 @@ case class EulerNumber() extends LeafMathExpression(math.E, "E") * evaluated by the optimizer during constant folding. */ @ExpressionDescription( - usage = "_FUNC_() - Returns PI.", - extended = "> SELECT _FUNC_();\n 3.141592653589793") + usage = "_FUNC_() - Returns pi.", + extended = """ +Examples: + > SELECT _FUNC_(); + 3.141592653589793 + """) case class Pi() extends LeafMathExpression(math.Pi, "PI") @@ -158,29 +166,61 @@ case class Pi() extends LeafMathExpression(math.Pi, "PI") +// scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(x) - Returns the arc cosine of x if -1<=x<=1 or NaN otherwise.", - extended = "> SELECT _FUNC_(1);\n 0.0\n> SELECT _FUNC_(2);\n NaN") + usage = "_FUNC_(expr) - Returns the inverse cosine (a.k.a. arccosine) of `expr` if -1<=`expr`<=1 or NaN otherwise.", + extended = """ +Examples: + > SELECT _FUNC_(1); + 0.0 + > SELECT _FUNC_(2); + NaN + """) +// scalastyle:on line.size.limit case class Acos(child: Expression) extends UnaryMathExpression(math.acos, "ACOS") +// scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(x) - Returns the arc sin of x if -1<=x<=1 or NaN otherwise.", - extended = "> SELECT _FUNC_(0);\n 0.0\n> SELECT _FUNC_(2);\n NaN") + usage = "_FUNC_(expr) - Returns the inverse sine (a.k.a. arcsine) the arc sin of `expr` if -1<=`expr`<=1 or NaN otherwise.", + extended = """ +Examples: + > SELECT _FUNC_(0); + 0.0 + > SELECT _FUNC_(2); + NaN + """) +// scalastyle:on line.size.limit case class Asin(child: Expression) extends UnaryMathExpression(math.asin, "ASIN") +// scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(x) - Returns the arc tangent.", - extended = "> SELECT _FUNC_(0);\n 0.0") + usage = "_FUNC_(expr) - Returns the inverse tangent (a.k.a. arctangent).", + extended = """ +Examples: + > SELECT _FUNC_(0); + 0.0 + """) +// scalastyle:on line.size.limit case class Atan(child: Expression) extends UnaryMathExpression(math.atan, "ATAN") @ExpressionDescription( - usage = "_FUNC_(x) - Returns the cube root of a double value.", - extended = "> SELECT _FUNC_(27.0);\n 3.0") + usage = "_FUNC_(expr) - Returns the cube root of `expr`.", + extended = """ +Examples: + > SELECT _FUNC_(27.0); + 3.0 + """) case class Cbrt(child: Expression) extends UnaryMathExpression(math.cbrt, "CBRT") @ExpressionDescription( - usage = "_FUNC_(x) - Returns the smallest integer not smaller than x.", - extended = "> SELECT _FUNC_(-0.1);\n 0\n> SELECT _FUNC_(5);\n 5") + usage = "_FUNC_(expr) - Returns the smallest integer not smaller than `expr`.", + extended = """ +Examples: + > SELECT _FUNC_(-0.1); + 0 + > SELECT _FUNC_(5); + 5 + """) case class Ceil(child: Expression) extends UnaryMathExpression(math.ceil, "CEIL") { override def dataType: DataType = child.dataType match { case dt @ DecimalType.Fixed(_, 0) => dt @@ -208,13 +248,21 @@ case class Ceil(child: Expression) extends UnaryMathExpression(math.ceil, "CEIL" } @ExpressionDescription( - usage = "_FUNC_(x) - Returns the cosine of x.", - extended = "> SELECT _FUNC_(0);\n 1.0") + usage = "_FUNC_(expr) - Returns the cosine of `expr`.", + extended = """ +Examples: + > SELECT _FUNC_(0); + 1.0 + """) case class Cos(child: Expression) extends UnaryMathExpression(math.cos, "COS")
[3/3] spark git commit: [SPARK-17963][SQL][DOCUMENTATION] Add examples (extend) in each expression and improve documentation
[SPARK-17963][SQL][DOCUMENTATION] Add examples (extend) in each expression and improve documentation ## What changes were proposed in this pull request? This PR proposes to change the documentation for functions. Please refer the discussion from https://github.com/apache/spark/pull/15513 The changes include - Re-indent the documentation - Add examples/arguments in `extended` where the arguments are multiple or specific format (e.g. xml/ json). For examples, the documentation was updated as below: ### Functions with single line usage **Before** - `pow` ``` sql Usage: pow(x1, x2) - Raise x1 to the power of x2. Extended Usage: > SELECT pow(2, 3); 8.0 ``` - `current_timestamp` ``` sql Usage: current_timestamp() - Returns the current timestamp at the start of query evaluation. Extended Usage: No example for current_timestamp. ``` **After** - `pow` ``` sql Usage: pow(expr1, expr2) - Raises `expr1` to the power of `expr2`. Extended Usage: Examples: > SELECT pow(2, 3); 8.0 ``` - `current_timestamp` ``` sql Usage: current_timestamp() - Returns the current timestamp at the start of query evaluation. Extended Usage: No example/argument for current_timestamp. ``` ### Functions with (already) multiple line usage **Before** - `approx_count_distinct` ``` sql Usage: approx_count_distinct(expr) - Returns the estimated cardinality by HyperLogLog++. approx_count_distinct(expr, relativeSD=0.05) - Returns the estimated cardinality by HyperLogLog++ with relativeSD, the maximum estimation error allowed. Extended Usage: No example for approx_count_distinct. ``` - `percentile_approx` ``` sql Usage: percentile_approx(col, percentage [, accuracy]) - Returns the approximate percentile value of numeric column `col` at the given percentage. The value of percentage must be between 0.0 and 1.0. The `accuracy` parameter (default: 1) is a positive integer literal which controls approximation accuracy at the cost of memory. Higher value of `accuracy` yields better accuracy, `1.0/accuracy` is the relative error of the approximation. percentile_approx(col, array(percentage1 [, percentage2]...) [, accuracy]) - Returns the approximate percentile array of column `col` at the given percentage array. Each value of the percentage array must be between 0.0 and 1.0. The `accuracy` parameter (default: 1) is a positive integer literal which controls approximation accuracy at the cost of memory. Higher value of `accuracy` yields better accuracy, `1.0/accuracy` is the relative error of the approximation. Extended Usage: No example for percentile_approx. ``` **After** - `approx_count_distinct` ``` sql Usage: approx_count_distinct(expr[, relativeSD]) - Returns the estimated cardinality by HyperLogLog++. `relativeSD` defines the maximum estimation error allowed. Extended Usage: No example/argument for approx_count_distinct. ``` - `percentile_approx` ``` sql Usage: percentile_approx(col, percentage [, accuracy]) - Returns the approximate percentile value of numeric column `col` at the given percentage. The value of percentage must be between 0.0 and 1.0. The `accuracy` parameter (default: 1) is a positive numeric literal which controls approximation accuracy at the cost of memory. Higher value of `accuracy` yields better accuracy, `1.0/accuracy` is the relative error of the approximation. When `percentage` is an array, each value of the percentage array must be between 0.0 and 1.0. In this case, returns the approximate percentile array of column `col` at the given percentage array. Extended Usage: Examples: > SELECT percentile_approx(10.0, array(0.5, 0.4, 0.1), 100); [10.0,10.0,10.0] > SELECT percentile_approx(10.0, 0.5, 100); 10.0 ``` ## How was this patch tested? Manually tested **When examples are multiple** ``` sql spark-sql> describe function extended reflect; Function: reflect Class: org.apache.spark.sql.catalyst.expressions.CallMethodViaReflection Usage: reflect(class, method[, arg1[, arg2 ..]]) - Calls a method with reflection. Extended Usage: Examples: > SELECT reflect('java.util.UUID', 'randomUUID'); c33fb387-8500-4bfa-81d2-6e0e3e930df2 > SELECT reflect('java.util.UUID', 'fromString', 'a5cf6c42-0c85-418f-af6c-3e4e5b1328f2'); a5cf6c42-0c85-418f-af6c-3e4e5b1328f2 ``` **When `Usage` is in single line** ``` sql spark-sql> describe function extended min; Function: min Class: org.apache.spark.sql.catalyst.expressions.aggregate.Min Usage: min(expr) - Returns the minimum value of `expr`. Extended Usage: No example/argument for min. ``` **When `Usage` is already in multiple lines** ``` sql spark-sql> describe function extended
[1/3] spark git commit: [SPARK-17963][SQL][DOCUMENTATION] Add examples (extend) in each expression and improve documentation
Repository: spark Updated Branches: refs/heads/branch-2.1 5ea2f9e5e -> 1e29f0a0d http://git-wip-us.apache.org/repos/asf/spark/blob/1e29f0a0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 24d825f..ea53987 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -103,18 +103,22 @@ case class DescribeFunctionCommand( functionName.funcName.toLowerCase match { case "<>" => Row(s"Function: $functionName") :: - Row(s"Usage: a <> b - Returns TRUE if a is not equal to b") :: Nil + Row("Usage: expr1 <> expr2 - " + +"Returns true if `expr1` is not equal to `expr2`.") :: Nil case "!=" => Row(s"Function: $functionName") :: - Row(s"Usage: a != b - Returns TRUE if a is not equal to b") :: Nil + Row("Usage: expr1 != expr2 - " + +"Returns true if `expr1` is not equal to `expr2`.") :: Nil case "between" => -Row(s"Function: between") :: - Row(s"Usage: a [NOT] BETWEEN b AND c - " + -s"evaluate if a is [not] in between b and c") :: Nil +Row("Function: between") :: + Row("Usage: expr1 [NOT] BETWEEN expr2 AND expr3 - " + +"evaluate if `expr1` is [not] in between `expr2` and `expr3`.") :: Nil case "case" => -Row(s"Function: case") :: - Row(s"Usage: CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END - " + -s"When a = b, returns c; when a = d, return e; else return f") :: Nil +Row("Function: case") :: + Row("Usage: CASE expr1 WHEN expr2 THEN expr3 " + +"[WHEN expr4 THEN expr5]* [ELSE expr6] END - " + +"When `expr1` = `expr2`, returns `expr3`; " + +"when `expr1` = `expr4`, return `expr5`; else return `expr6`.") :: Nil case _ => try { val info = sparkSession.sessionState.catalog.lookupFunctionInfo(functionName) @@ -126,7 +130,7 @@ case class DescribeFunctionCommand( if (isExtended) { result :+ - Row(s"Extended Usage:\n${replaceFunctionName(info.getExtended, name)}") + Row(s"Extended Usage:${replaceFunctionName(info.getExtended, info.getName)}") } else { result } http://git-wip-us.apache.org/repos/asf/spark/blob/1e29f0a0/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 9a3d93c..6b517bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -85,15 +85,16 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkKeywordsExist(sql("describe function extended upper"), "Function: upper", "Class: org.apache.spark.sql.catalyst.expressions.Upper", - "Usage: upper(str) - Returns str with all characters changed to uppercase", + "Usage: upper(str) - Returns `str` with all characters changed to uppercase", "Extended Usage:", + "Examples:", "> SELECT upper('SparkSql');", - "'SPARKSQL'") + "SPARKSQL") checkKeywordsExist(sql("describe functioN Upper"), "Function: upper", "Class: org.apache.spark.sql.catalyst.expressions.Upper", - "Usage: upper(str) - Returns str with all characters changed to uppercase") + "Usage: upper(str) - Returns `str` with all characters changed to uppercase") checkKeywordsNotExist(sql("describe functioN Upper"), "Extended Usage") http://git-wip-us.apache.org/repos/asf/spark/blob/1e29f0a0/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index bde3c8a..22d4c92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1445,34 +1445,34 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql("DESCRIBE FUNCTION log"), Row("Class: org.apache.spark.sql.catalyst.expressions.Logarithm") :: Row("Function: log") :: -Row("Usage: log(b, x) - Returns the logarithm of x with
[2/3] spark git commit: [SPARK-17963][SQL][DOCUMENTATION] Add examples (extend) in each expression and improve documentation
http://git-wip-us.apache.org/repos/asf/spark/blob/7eb2ca8e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala index 5152265..a60494a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala @@ -139,8 +139,12 @@ abstract class BinaryMathExpression(f: (Double, Double) => Double, name: String) * evaluated by the optimizer during constant folding. */ @ExpressionDescription( - usage = "_FUNC_() - Returns Euler's number, E.", - extended = "> SELECT _FUNC_();\n 2.718281828459045") + usage = "_FUNC_() - Returns Euler's number, e.", + extended = """ +Examples: + > SELECT _FUNC_(); + 2.718281828459045 + """) case class EulerNumber() extends LeafMathExpression(math.E, "E") /** @@ -148,8 +152,12 @@ case class EulerNumber() extends LeafMathExpression(math.E, "E") * evaluated by the optimizer during constant folding. */ @ExpressionDescription( - usage = "_FUNC_() - Returns PI.", - extended = "> SELECT _FUNC_();\n 3.141592653589793") + usage = "_FUNC_() - Returns pi.", + extended = """ +Examples: + > SELECT _FUNC_(); + 3.141592653589793 + """) case class Pi() extends LeafMathExpression(math.Pi, "PI") @@ -158,29 +166,61 @@ case class Pi() extends LeafMathExpression(math.Pi, "PI") +// scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(x) - Returns the arc cosine of x if -1<=x<=1 or NaN otherwise.", - extended = "> SELECT _FUNC_(1);\n 0.0\n> SELECT _FUNC_(2);\n NaN") + usage = "_FUNC_(expr) - Returns the inverse cosine (a.k.a. arccosine) of `expr` if -1<=`expr`<=1 or NaN otherwise.", + extended = """ +Examples: + > SELECT _FUNC_(1); + 0.0 + > SELECT _FUNC_(2); + NaN + """) +// scalastyle:on line.size.limit case class Acos(child: Expression) extends UnaryMathExpression(math.acos, "ACOS") +// scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(x) - Returns the arc sin of x if -1<=x<=1 or NaN otherwise.", - extended = "> SELECT _FUNC_(0);\n 0.0\n> SELECT _FUNC_(2);\n NaN") + usage = "_FUNC_(expr) - Returns the inverse sine (a.k.a. arcsine) the arc sin of `expr` if -1<=`expr`<=1 or NaN otherwise.", + extended = """ +Examples: + > SELECT _FUNC_(0); + 0.0 + > SELECT _FUNC_(2); + NaN + """) +// scalastyle:on line.size.limit case class Asin(child: Expression) extends UnaryMathExpression(math.asin, "ASIN") +// scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(x) - Returns the arc tangent.", - extended = "> SELECT _FUNC_(0);\n 0.0") + usage = "_FUNC_(expr) - Returns the inverse tangent (a.k.a. arctangent).", + extended = """ +Examples: + > SELECT _FUNC_(0); + 0.0 + """) +// scalastyle:on line.size.limit case class Atan(child: Expression) extends UnaryMathExpression(math.atan, "ATAN") @ExpressionDescription( - usage = "_FUNC_(x) - Returns the cube root of a double value.", - extended = "> SELECT _FUNC_(27.0);\n 3.0") + usage = "_FUNC_(expr) - Returns the cube root of `expr`.", + extended = """ +Examples: + > SELECT _FUNC_(27.0); + 3.0 + """) case class Cbrt(child: Expression) extends UnaryMathExpression(math.cbrt, "CBRT") @ExpressionDescription( - usage = "_FUNC_(x) - Returns the smallest integer not smaller than x.", - extended = "> SELECT _FUNC_(-0.1);\n 0\n> SELECT _FUNC_(5);\n 5") + usage = "_FUNC_(expr) - Returns the smallest integer not smaller than `expr`.", + extended = """ +Examples: + > SELECT _FUNC_(-0.1); + 0 + > SELECT _FUNC_(5); + 5 + """) case class Ceil(child: Expression) extends UnaryMathExpression(math.ceil, "CEIL") { override def dataType: DataType = child.dataType match { case dt @ DecimalType.Fixed(_, 0) => dt @@ -208,13 +248,21 @@ case class Ceil(child: Expression) extends UnaryMathExpression(math.ceil, "CEIL" } @ExpressionDescription( - usage = "_FUNC_(x) - Returns the cosine of x.", - extended = "> SELECT _FUNC_(0);\n 1.0") + usage = "_FUNC_(expr) - Returns the cosine of `expr`.", + extended = """ +Examples: + > SELECT _FUNC_(0); + 1.0 + """) case class Cos(child: Expression) extends UnaryMathExpression(math.cos, "COS")
[1/3] spark git commit: [SPARK-17963][SQL][DOCUMENTATION] Add examples (extend) in each expression and improve documentation
Repository: spark Updated Branches: refs/heads/master 3a1bc6f47 -> 7eb2ca8e3 http://git-wip-us.apache.org/repos/asf/spark/blob/7eb2ca8e/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 24d825f..ea53987 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -103,18 +103,22 @@ case class DescribeFunctionCommand( functionName.funcName.toLowerCase match { case "<>" => Row(s"Function: $functionName") :: - Row(s"Usage: a <> b - Returns TRUE if a is not equal to b") :: Nil + Row("Usage: expr1 <> expr2 - " + +"Returns true if `expr1` is not equal to `expr2`.") :: Nil case "!=" => Row(s"Function: $functionName") :: - Row(s"Usage: a != b - Returns TRUE if a is not equal to b") :: Nil + Row("Usage: expr1 != expr2 - " + +"Returns true if `expr1` is not equal to `expr2`.") :: Nil case "between" => -Row(s"Function: between") :: - Row(s"Usage: a [NOT] BETWEEN b AND c - " + -s"evaluate if a is [not] in between b and c") :: Nil +Row("Function: between") :: + Row("Usage: expr1 [NOT] BETWEEN expr2 AND expr3 - " + +"evaluate if `expr1` is [not] in between `expr2` and `expr3`.") :: Nil case "case" => -Row(s"Function: case") :: - Row(s"Usage: CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END - " + -s"When a = b, returns c; when a = d, return e; else return f") :: Nil +Row("Function: case") :: + Row("Usage: CASE expr1 WHEN expr2 THEN expr3 " + +"[WHEN expr4 THEN expr5]* [ELSE expr6] END - " + +"When `expr1` = `expr2`, returns `expr3`; " + +"when `expr1` = `expr4`, return `expr5`; else return `expr6`.") :: Nil case _ => try { val info = sparkSession.sessionState.catalog.lookupFunctionInfo(functionName) @@ -126,7 +130,7 @@ case class DescribeFunctionCommand( if (isExtended) { result :+ - Row(s"Extended Usage:\n${replaceFunctionName(info.getExtended, name)}") + Row(s"Extended Usage:${replaceFunctionName(info.getExtended, info.getName)}") } else { result } http://git-wip-us.apache.org/repos/asf/spark/blob/7eb2ca8e/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 9a3d93c..6b517bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -85,15 +85,16 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkKeywordsExist(sql("describe function extended upper"), "Function: upper", "Class: org.apache.spark.sql.catalyst.expressions.Upper", - "Usage: upper(str) - Returns str with all characters changed to uppercase", + "Usage: upper(str) - Returns `str` with all characters changed to uppercase", "Extended Usage:", + "Examples:", "> SELECT upper('SparkSql');", - "'SPARKSQL'") + "SPARKSQL") checkKeywordsExist(sql("describe functioN Upper"), "Function: upper", "Class: org.apache.spark.sql.catalyst.expressions.Upper", - "Usage: upper(str) - Returns str with all characters changed to uppercase") + "Usage: upper(str) - Returns `str` with all characters changed to uppercase") checkKeywordsNotExist(sql("describe functioN Upper"), "Extended Usage") http://git-wip-us.apache.org/repos/asf/spark/blob/7eb2ca8e/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index bde3c8a..22d4c92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1445,34 +1445,34 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql("DESCRIBE FUNCTION log"), Row("Class: org.apache.spark.sql.catalyst.expressions.Logarithm") :: Row("Function: log") :: -Row("Usage: log(b, x) - Returns the logarithm of x with base
[3/3] spark git commit: [SPARK-17963][SQL][DOCUMENTATION] Add examples (extend) in each expression and improve documentation
[SPARK-17963][SQL][DOCUMENTATION] Add examples (extend) in each expression and improve documentation ## What changes were proposed in this pull request? This PR proposes to change the documentation for functions. Please refer the discussion from https://github.com/apache/spark/pull/15513 The changes include - Re-indent the documentation - Add examples/arguments in `extended` where the arguments are multiple or specific format (e.g. xml/ json). For examples, the documentation was updated as below: ### Functions with single line usage **Before** - `pow` ``` sql Usage: pow(x1, x2) - Raise x1 to the power of x2. Extended Usage: > SELECT pow(2, 3); 8.0 ``` - `current_timestamp` ``` sql Usage: current_timestamp() - Returns the current timestamp at the start of query evaluation. Extended Usage: No example for current_timestamp. ``` **After** - `pow` ``` sql Usage: pow(expr1, expr2) - Raises `expr1` to the power of `expr2`. Extended Usage: Examples: > SELECT pow(2, 3); 8.0 ``` - `current_timestamp` ``` sql Usage: current_timestamp() - Returns the current timestamp at the start of query evaluation. Extended Usage: No example/argument for current_timestamp. ``` ### Functions with (already) multiple line usage **Before** - `approx_count_distinct` ``` sql Usage: approx_count_distinct(expr) - Returns the estimated cardinality by HyperLogLog++. approx_count_distinct(expr, relativeSD=0.05) - Returns the estimated cardinality by HyperLogLog++ with relativeSD, the maximum estimation error allowed. Extended Usage: No example for approx_count_distinct. ``` - `percentile_approx` ``` sql Usage: percentile_approx(col, percentage [, accuracy]) - Returns the approximate percentile value of numeric column `col` at the given percentage. The value of percentage must be between 0.0 and 1.0. The `accuracy` parameter (default: 1) is a positive integer literal which controls approximation accuracy at the cost of memory. Higher value of `accuracy` yields better accuracy, `1.0/accuracy` is the relative error of the approximation. percentile_approx(col, array(percentage1 [, percentage2]...) [, accuracy]) - Returns the approximate percentile array of column `col` at the given percentage array. Each value of the percentage array must be between 0.0 and 1.0. The `accuracy` parameter (default: 1) is a positive integer literal which controls approximation accuracy at the cost of memory. Higher value of `accuracy` yields better accuracy, `1.0/accuracy` is the relative error of the approximation. Extended Usage: No example for percentile_approx. ``` **After** - `approx_count_distinct` ``` sql Usage: approx_count_distinct(expr[, relativeSD]) - Returns the estimated cardinality by HyperLogLog++. `relativeSD` defines the maximum estimation error allowed. Extended Usage: No example/argument for approx_count_distinct. ``` - `percentile_approx` ``` sql Usage: percentile_approx(col, percentage [, accuracy]) - Returns the approximate percentile value of numeric column `col` at the given percentage. The value of percentage must be between 0.0 and 1.0. The `accuracy` parameter (default: 1) is a positive numeric literal which controls approximation accuracy at the cost of memory. Higher value of `accuracy` yields better accuracy, `1.0/accuracy` is the relative error of the approximation. When `percentage` is an array, each value of the percentage array must be between 0.0 and 1.0. In this case, returns the approximate percentile array of column `col` at the given percentage array. Extended Usage: Examples: > SELECT percentile_approx(10.0, array(0.5, 0.4, 0.1), 100); [10.0,10.0,10.0] > SELECT percentile_approx(10.0, 0.5, 100); 10.0 ``` ## How was this patch tested? Manually tested **When examples are multiple** ``` sql spark-sql> describe function extended reflect; Function: reflect Class: org.apache.spark.sql.catalyst.expressions.CallMethodViaReflection Usage: reflect(class, method[, arg1[, arg2 ..]]) - Calls a method with reflection. Extended Usage: Examples: > SELECT reflect('java.util.UUID', 'randomUUID'); c33fb387-8500-4bfa-81d2-6e0e3e930df2 > SELECT reflect('java.util.UUID', 'fromString', 'a5cf6c42-0c85-418f-af6c-3e4e5b1328f2'); a5cf6c42-0c85-418f-af6c-3e4e5b1328f2 ``` **When `Usage` is in single line** ``` sql spark-sql> describe function extended min; Function: min Class: org.apache.spark.sql.catalyst.expressions.aggregate.Min Usage: min(expr) - Returns the minimum value of `expr`. Extended Usage: No example/argument for min. ``` **When `Usage` is already in multiple lines** ``` sql spark-sql> describe function extended
spark git commit: [SPARK-17470][SQL] unify path for data source table and locationUri for hive serde table
Repository: spark Updated Branches: refs/heads/branch-2.1 2aff2ea81 -> 5ea2f9e5e [SPARK-17470][SQL] unify path for data source table and locationUri for hive serde table ## What changes were proposed in this pull request? Due to a limitation of hive metastore(table location must be directory path, not file path), we always store `path` for data source table in storage properties, instead of the `locationUri` field. However, we should not expose this difference to `CatalogTable` level, but just treat it as a hack in `HiveExternalCatalog`, like we store table schema of data source table in table properties. This PR unifies `path` and `locationUri` outside of `HiveExternalCatalog`, both data source table and hive serde table should use the `locationUri` field. This PR also unifies the way we handle default table location for managed table. Previously, the default table location of hive serde managed table is set by external catalog, but the one of data source table is set by command. After this PR, we follow the hive way and the default table location is always set by external catalog. For managed non-file-based tables, we will assign a default table location and create an empty directory for it, the table location will be removed when the table is dropped. This is reasonable as metastore doesn't care about whether a table is file-based or not, and an empty table directory has no harm. For external non-file-based tables, ideally we can omit the table location, but due to a hive metastore issue, we will assign a random location to it, and remove it right after the table is created. See SPARK-15269 for more details. This is fine as it's well isolated in `HiveExternalCatalog`. To keep the existing behaviour of the `path` option, in this PR we always add the `locationUri` to storage properties using key `path`, before passing storage properties to `DataSource` as data source options. ## How was this patch tested? existing tests. Author: Wenchen FanCloses #15024 from cloud-fan/path. (cherry picked from commit 3a1bc6f4780f8384c1211b1335e7394a4a28377e) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ea2f9e5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ea2f9e5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ea2f9e5 Branch: refs/heads/branch-2.1 Commit: 5ea2f9e5e449c02f77635918bfcc7ba7193c97a2 Parents: 2aff2ea Author: Wenchen Fan Authored: Wed Nov 2 18:05:14 2016 -0700 Committer: Yin Huai Committed: Wed Nov 2 18:05:29 2016 -0700 -- R/pkg/inst/tests/testthat/test_sparkSQL.R | 4 +- .../sql/catalyst/catalog/InMemoryCatalog.scala | 40 ++- .../org/apache/spark/sql/DataFrameWriter.scala | 5 +- .../spark/sql/execution/SparkSqlParser.scala| 17 +- .../command/createDataSourceTables.scala| 37 +-- .../spark/sql/execution/command/ddl.scala | 23 +- .../spark/sql/execution/command/tables.scala| 50 ++-- .../sql/execution/datasources/DataSource.scala | 241 ++- .../datasources/DataSourceStrategy.scala| 3 +- .../apache/spark/sql/internal/CatalogImpl.scala | 4 +- .../spark/sql/execution/command/DDLSuite.scala | 1 - .../spark/sql/sources/PathOptionSuite.scala | 136 +++ .../spark/sql/hive/HiveExternalCatalog.scala| 227 +++-- .../spark/sql/hive/HiveMetastoreCatalog.scala | 16 +- .../sql/hive/HiveMetastoreCatalogSuite.scala| 3 +- .../sql/hive/MetastoreDataSourcesSuite.scala| 28 ++- .../spark/sql/hive/MultiDatabaseSuite.scala | 2 +- .../spark/sql/hive/execution/HiveDDLSuite.scala | 14 +- .../sql/hive/execution/SQLQuerySuite.scala | 4 +- 19 files changed, 520 insertions(+), 335 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5ea2f9e5/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index d7fe6b3..ee48baa 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2659,7 +2659,7 @@ test_that("Call DataFrameWriter.save() API in Java without path and check argume # It makes sure that we can omit path argument in write.df API and then it calls # DataFrameWriter.save() without path. expect_error(write.df(df, source = "csv"), - "Error in save : illegal argument - 'path' is not specified") + "Error in save : illegal argument - Expected exactly one path to be specified") expect_error(write.json(df, jsonPath), "Error in json : analysis error -
spark git commit: [SPARK-17470][SQL] unify path for data source table and locationUri for hive serde table
Repository: spark Updated Branches: refs/heads/master fd90541c3 -> 3a1bc6f47 [SPARK-17470][SQL] unify path for data source table and locationUri for hive serde table ## What changes were proposed in this pull request? Due to a limitation of hive metastore(table location must be directory path, not file path), we always store `path` for data source table in storage properties, instead of the `locationUri` field. However, we should not expose this difference to `CatalogTable` level, but just treat it as a hack in `HiveExternalCatalog`, like we store table schema of data source table in table properties. This PR unifies `path` and `locationUri` outside of `HiveExternalCatalog`, both data source table and hive serde table should use the `locationUri` field. This PR also unifies the way we handle default table location for managed table. Previously, the default table location of hive serde managed table is set by external catalog, but the one of data source table is set by command. After this PR, we follow the hive way and the default table location is always set by external catalog. For managed non-file-based tables, we will assign a default table location and create an empty directory for it, the table location will be removed when the table is dropped. This is reasonable as metastore doesn't care about whether a table is file-based or not, and an empty table directory has no harm. For external non-file-based tables, ideally we can omit the table location, but due to a hive metastore issue, we will assign a random location to it, and remove it right after the table is created. See SPARK-15269 for more details. This is fine as it's well isolated in `HiveExternalCatalog`. To keep the existing behaviour of the `path` option, in this PR we always add the `locationUri` to storage properties using key `path`, before passing storage properties to `DataSource` as data source options. ## How was this patch tested? existing tests. Author: Wenchen FanCloses #15024 from cloud-fan/path. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a1bc6f4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a1bc6f4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a1bc6f4 Branch: refs/heads/master Commit: 3a1bc6f4780f8384c1211b1335e7394a4a28377e Parents: fd90541 Author: Wenchen Fan Authored: Wed Nov 2 18:05:14 2016 -0700 Committer: Yin Huai Committed: Wed Nov 2 18:05:14 2016 -0700 -- R/pkg/inst/tests/testthat/test_sparkSQL.R | 4 +- .../sql/catalyst/catalog/InMemoryCatalog.scala | 40 ++- .../org/apache/spark/sql/DataFrameWriter.scala | 5 +- .../spark/sql/execution/SparkSqlParser.scala| 17 +- .../command/createDataSourceTables.scala| 37 +-- .../spark/sql/execution/command/ddl.scala | 23 +- .../spark/sql/execution/command/tables.scala| 50 ++-- .../sql/execution/datasources/DataSource.scala | 241 ++- .../datasources/DataSourceStrategy.scala| 3 +- .../apache/spark/sql/internal/CatalogImpl.scala | 4 +- .../spark/sql/execution/command/DDLSuite.scala | 1 - .../spark/sql/sources/PathOptionSuite.scala | 136 +++ .../spark/sql/hive/HiveExternalCatalog.scala| 227 +++-- .../spark/sql/hive/HiveMetastoreCatalog.scala | 16 +- .../sql/hive/HiveMetastoreCatalogSuite.scala| 3 +- .../sql/hive/MetastoreDataSourcesSuite.scala| 28 ++- .../spark/sql/hive/MultiDatabaseSuite.scala | 2 +- .../spark/sql/hive/execution/HiveDDLSuite.scala | 14 +- .../sql/hive/execution/SQLQuerySuite.scala | 4 +- 19 files changed, 520 insertions(+), 335 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index d7fe6b3..ee48baa 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2659,7 +2659,7 @@ test_that("Call DataFrameWriter.save() API in Java without path and check argume # It makes sure that we can omit path argument in write.df API and then it calls # DataFrameWriter.save() without path. expect_error(write.df(df, source = "csv"), - "Error in save : illegal argument - 'path' is not specified") + "Error in save : illegal argument - Expected exactly one path to be specified") expect_error(write.json(df, jsonPath), "Error in json : analysis error - path file:.*already exists") expect_error(write.text(df, jsonPath), @@ -2667,7 +2667,7 @@ test_that("Call
spark git commit: [SPARK-18214][SQL] Simplify RuntimeReplaceable type coercion
Repository: spark Updated Branches: refs/heads/branch-2.1 1eef8e5cd -> 2aff2ea81 [SPARK-18214][SQL] Simplify RuntimeReplaceable type coercion ## What changes were proposed in this pull request? RuntimeReplaceable is used to create aliases for expressions, but the way it deals with type coercion is pretty weird (each expression is responsible for how to handle type coercion, which does not obey the normal implicit type cast rules). This patch simplifies its handling by allowing the analyzer to traverse into the actual expression of a RuntimeReplaceable. ## How was this patch tested? - Correctness should be guaranteed by existing unit tests already - Removed SQLCompatibilityFunctionSuite and moved it sql-compatibility-functions.sql - Added a new test case in sql-compatibility-functions.sql for verifying explain behavior. Author: Reynold XinCloses #15723 from rxin/SPARK-18214. (cherry picked from commit fd90541c35af2bccf0155467bec8cea7c8865046) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2aff2ea8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2aff2ea8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2aff2ea8 Branch: refs/heads/branch-2.1 Commit: 2aff2ea81d260a47e7762b2990ed62a91e5d0198 Parents: 1eef8e5 Author: Reynold Xin Authored: Wed Nov 2 15:53:02 2016 -0700 Committer: Reynold Xin Committed: Wed Nov 2 15:53:09 2016 -0700 -- .../sql/catalyst/analysis/TypeCoercion.scala| 2 - .../sql/catalyst/expressions/Expression.scala | 30 ++--- .../expressions/datetimeExpressions.scala | 2 - .../catalyst/expressions/nullExpressions.scala | 75 --- .../sql/catalyst/optimizer/finishAnalysis.scala | 2 +- .../expressions/NullFunctionsSuite.scala| 19 ++- .../inputs/sql-compatibility-functions.sql | 25 .../resources/sql-tests/results/array.sql.out | 5 +- .../results/sql-compatibility-functions.sql.out | 124 +++ .../sql/SQLCompatibilityFunctionSuite.scala | 98 --- .../apache/spark/sql/SQLQueryTestSuite.scala| 4 +- 11 files changed, 204 insertions(+), 182 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2aff2ea8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 01b04c0..6662a9e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -528,8 +528,6 @@ object TypeCoercion { NaNvl(l, Cast(r, DoubleType)) case NaNvl(l, r) if l.dataType == FloatType && r.dataType == DoubleType => NaNvl(Cast(l, DoubleType), r) - - case e: RuntimeReplaceable => e.replaceForTypeCoercion() } } http://git-wip-us.apache.org/repos/asf/spark/blob/2aff2ea8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 726a231..221f830 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -186,7 +186,7 @@ abstract class Expression extends TreeNode[Expression] { */ def prettyName: String = nodeName.toLowerCase - protected def flatArguments = productIterator.flatMap { + protected def flatArguments: Iterator[Any] = productIterator.flatMap { case t: Traversable[_] => t case single => single :: Nil } @@ -229,26 +229,16 @@ trait Unevaluable extends Expression { * An expression that gets replaced at runtime (currently by the optimizer) into a different * expression for evaluation. This is mainly used to provide compatibility with other databases. * For example, we use this to support "nvl" by replacing it with "coalesce". + * + * A RuntimeReplaceable should have the original parameters along with a "child" expression in the + * case class constructor, and define a normal constructor that accepts only the original + * parameters. For an example, see [[Nvl]]. To make sure the explain plan and expression SQL + * works correctly, the implementation should
spark git commit: [SPARK-18214][SQL] Simplify RuntimeReplaceable type coercion
Repository: spark Updated Branches: refs/heads/master 37d95227a -> fd90541c3 [SPARK-18214][SQL] Simplify RuntimeReplaceable type coercion ## What changes were proposed in this pull request? RuntimeReplaceable is used to create aliases for expressions, but the way it deals with type coercion is pretty weird (each expression is responsible for how to handle type coercion, which does not obey the normal implicit type cast rules). This patch simplifies its handling by allowing the analyzer to traverse into the actual expression of a RuntimeReplaceable. ## How was this patch tested? - Correctness should be guaranteed by existing unit tests already - Removed SQLCompatibilityFunctionSuite and moved it sql-compatibility-functions.sql - Added a new test case in sql-compatibility-functions.sql for verifying explain behavior. Author: Reynold XinCloses #15723 from rxin/SPARK-18214. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd90541c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd90541c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd90541c Branch: refs/heads/master Commit: fd90541c35af2bccf0155467bec8cea7c8865046 Parents: 37d9522 Author: Reynold Xin Authored: Wed Nov 2 15:53:02 2016 -0700 Committer: Reynold Xin Committed: Wed Nov 2 15:53:02 2016 -0700 -- .../sql/catalyst/analysis/TypeCoercion.scala| 2 - .../sql/catalyst/expressions/Expression.scala | 30 ++--- .../expressions/datetimeExpressions.scala | 2 - .../catalyst/expressions/nullExpressions.scala | 75 --- .../sql/catalyst/optimizer/finishAnalysis.scala | 2 +- .../expressions/NullFunctionsSuite.scala| 19 ++- .../inputs/sql-compatibility-functions.sql | 25 .../resources/sql-tests/results/array.sql.out | 5 +- .../results/sql-compatibility-functions.sql.out | 124 +++ .../sql/SQLCompatibilityFunctionSuite.scala | 98 --- .../apache/spark/sql/SQLQueryTestSuite.scala| 4 +- 11 files changed, 204 insertions(+), 182 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fd90541c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 01b04c0..6662a9e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -528,8 +528,6 @@ object TypeCoercion { NaNvl(l, Cast(r, DoubleType)) case NaNvl(l, r) if l.dataType == FloatType && r.dataType == DoubleType => NaNvl(Cast(l, DoubleType), r) - - case e: RuntimeReplaceable => e.replaceForTypeCoercion() } } http://git-wip-us.apache.org/repos/asf/spark/blob/fd90541c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 726a231..221f830 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -186,7 +186,7 @@ abstract class Expression extends TreeNode[Expression] { */ def prettyName: String = nodeName.toLowerCase - protected def flatArguments = productIterator.flatMap { + protected def flatArguments: Iterator[Any] = productIterator.flatMap { case t: Traversable[_] => t case single => single :: Nil } @@ -229,26 +229,16 @@ trait Unevaluable extends Expression { * An expression that gets replaced at runtime (currently by the optimizer) into a different * expression for evaluation. This is mainly used to provide compatibility with other databases. * For example, we use this to support "nvl" by replacing it with "coalesce". + * + * A RuntimeReplaceable should have the original parameters along with a "child" expression in the + * case class constructor, and define a normal constructor that accepts only the original + * parameters. For an example, see [[Nvl]]. To make sure the explain plan and expression SQL + * works correctly, the implementation should also override flatArguments method and sql method. */ -trait RuntimeReplaceable extends Unevaluable { - /** - * Method for
[1/2] spark git commit: Preparing Spark release v1.6.3-rc2
Repository: spark Updated Branches: refs/heads/branch-1.6 82e98f126 -> 9136e2693 Preparing Spark release v1.6.3-rc2 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e860747 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e860747 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e860747 Branch: refs/heads/branch-1.6 Commit: 1e860747458d74a4ccbd081103a0542a2367b14b Parents: 82e98f1 Author: Patrick WendellAuthored: Wed Nov 2 14:45:51 2016 -0700 Committer: Patrick Wendell Committed: Wed Nov 2 14:45:51 2016 -0700 -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml| 2 +- bagel/pom.xml | 2 +- core/pom.xml| 2 +- docker-integration-tests/pom.xml| 2 +- docs/_config.yml| 4 ++-- examples/pom.xml| 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt-assembly/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml| 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml| 2 +- pom.xml | 2 +- repl/pom.xml| 2 +- sql/catalyst/pom.xml| 2 +- sql/core/pom.xml| 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml| 2 +- streaming/pom.xml | 2 +- tags/pom.xml| 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml| 2 +- 37 files changed, 38 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1e860747/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index f237df1..7e9fd2d 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,7 +1,7 @@ Package: SparkR Type: Package Title: R frontend for Spark -Version: 1.6.4 +Version: 1.6.3 Date: 2013-09-09 Author: The Apache Software Foundation Maintainer: Shivaram Venkataraman http://git-wip-us.apache.org/repos/asf/spark/blob/1e860747/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 13cf555..8e9747e 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.4-SNAPSHOT +1.6.3 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/1e860747/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index e080625..a73a18a 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.4-SNAPSHOT +1.6.3 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/1e860747/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index f60bd00..c232046 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.4-SNAPSHOT +1.6.3 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/1e860747/docker-integration-tests/pom.xml -- diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml index 8d5eb40..8514b48 100644 --- a/docker-integration-tests/pom.xml +++ b/docker-integration-tests/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 -1.6.4-SNAPSHOT +1.6.3 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/1e860747/docs/_config.yml -- diff --git a/docs/_config.yml b/docs/_config.yml index ee235bc..81dcb2c 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -14,8 +14,8 @@ include: # These allow the documentation to be updated with newer releases # of Spark, Scala, and
[2/2] spark git commit: Preparing development version 1.6.4-SNAPSHOT
Preparing development version 1.6.4-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9136e269 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9136e269 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9136e269 Branch: refs/heads/branch-1.6 Commit: 9136e2693d8272fff66dc187e28a310cf7593022 Parents: 1e86074 Author: Patrick WendellAuthored: Wed Nov 2 14:45:57 2016 -0700 Committer: Patrick Wendell Committed: Wed Nov 2 14:45:57 2016 -0700 -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml| 2 +- bagel/pom.xml | 2 +- core/pom.xml| 2 +- docker-integration-tests/pom.xml| 2 +- docs/_config.yml| 4 ++-- examples/pom.xml| 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt-assembly/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml| 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml| 2 +- pom.xml | 2 +- repl/pom.xml| 2 +- sql/catalyst/pom.xml| 2 +- sql/core/pom.xml| 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml| 2 +- streaming/pom.xml | 2 +- tags/pom.xml| 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml| 2 +- 37 files changed, 38 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9136e269/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 7e9fd2d..f237df1 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,7 +1,7 @@ Package: SparkR Type: Package Title: R frontend for Spark -Version: 1.6.3 +Version: 1.6.4 Date: 2013-09-09 Author: The Apache Software Foundation Maintainer: Shivaram Venkataraman http://git-wip-us.apache.org/repos/asf/spark/blob/9136e269/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 8e9747e..13cf555 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.3 +1.6.4-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/9136e269/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index a73a18a..e080625 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.3 +1.6.4-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/9136e269/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index c232046..f60bd00 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.3 +1.6.4-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/9136e269/docker-integration-tests/pom.xml -- diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml index 8514b48..8d5eb40 100644 --- a/docker-integration-tests/pom.xml +++ b/docker-integration-tests/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 -1.6.3 +1.6.4-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/9136e269/docs/_config.yml -- diff --git a/docs/_config.yml b/docs/_config.yml index 81dcb2c..ee235bc 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -14,8 +14,8 @@ include: # These allow the documentation to be updated with newer releases # of Spark, Scala, and Mesos. -SPARK_VERSION: 1.6.3 -SPARK_VERSION_SHORT: 1.6.3 +SPARK_VERSION:
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.6.3-rc2 [created] 1e8607474 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17058][BUILD] Add maven snapshots-and-staging profile to build/test against staging artifacts
Repository: spark Updated Branches: refs/heads/branch-2.1 bd3ea6595 -> 1eef8e5cd [SPARK-17058][BUILD] Add maven snapshots-and-staging profile to build/test against staging artifacts ## What changes were proposed in this pull request? Adds a `snapshots-and-staging profile` so that RCs of projects like Hadoop and HBase can be used in developer-only build and test runs. There's a comment above the profile telling people not to use this in production. There's no attempt to do the same for SBT, as Ivy is different. ## How was this patch tested? Tested by building against the Hadoop 2.7.3 RC 1 JARs without the profile (and without any local copy of the 2.7.3 artifacts), the build failed ``` mvn install -DskipTests -Pyarn,hadoop-2.7,hive -Dhadoop.version=2.7.3 ... [INFO] [INFO] Building Spark Project Launcher 2.1.0-SNAPSHOT [INFO] Downloading: https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client/2.7.3/hadoop-client-2.7.3.pom [WARNING] The POM for org.apache.hadoop:hadoop-client:jar:2.7.3 is missing, no dependency information available Downloading: https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client/2.7.3/hadoop-client-2.7.3.jar [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM ... SUCCESS [ 4.482 s] [INFO] Spark Project Tags . SUCCESS [ 17.402 s] [INFO] Spark Project Sketch ... SUCCESS [ 11.252 s] [INFO] Spark Project Networking ... SUCCESS [ 13.458 s] [INFO] Spark Project Shuffle Streaming Service SUCCESS [ 9.043 s] [INFO] Spark Project Unsafe ... SUCCESS [ 16.027 s] [INFO] Spark Project Launcher . FAILURE [ 1.653 s] [INFO] Spark Project Core . SKIPPED ... ``` With the profile, the build completed ``` mvn install -DskipTests -Pyarn,hadoop-2.7,hive,snapshots-and-staging -Dhadoop.version=2.7.3 ``` Author: Steve LoughranCloses #14646 from steveloughran/stevel/SPARK-17058-support-asf-snapshots. (cherry picked from commit 37d95227a21de602b939dae84943ba007f434513) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1eef8e5c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1eef8e5c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1eef8e5c Branch: refs/heads/branch-2.1 Commit: 1eef8e5cd09dfb8b77044ef9864321618e8ea8c8 Parents: bd3ea65 Author: Steve Loughran Authored: Wed Nov 2 11:52:29 2016 -0700 Committer: Reynold Xin Committed: Wed Nov 2 11:52:38 2016 -0700 -- pom.xml | 48 1 file changed, 48 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1eef8e5c/pom.xml -- diff --git a/pom.xml b/pom.xml index aaf7cfa..04d2eaa 100644 --- a/pom.xml +++ b/pom.xml @@ -2694,6 +2694,54 @@ + + snapshots-and-staging + + + https://repository.apache.org/content/groups/staging/ + https://repository.apache.org/content/repositories/snapshots/ + + + + + ASF Staging + ${asf.staging} + + + ASF Snapshots + ${asf.snapshots} + +true + + +false + + + + + + + ASF Staging + ${asf.staging} + + + ASF Snapshots + ${asf.snapshots} + +true + + +false + + + + + +
spark git commit: [SPARK-17058][BUILD] Add maven snapshots-and-staging profile to build/test against staging artifacts
Repository: spark Updated Branches: refs/heads/master 3c24299b7 -> 37d95227a [SPARK-17058][BUILD] Add maven snapshots-and-staging profile to build/test against staging artifacts ## What changes were proposed in this pull request? Adds a `snapshots-and-staging profile` so that RCs of projects like Hadoop and HBase can be used in developer-only build and test runs. There's a comment above the profile telling people not to use this in production. There's no attempt to do the same for SBT, as Ivy is different. ## How was this patch tested? Tested by building against the Hadoop 2.7.3 RC 1 JARs without the profile (and without any local copy of the 2.7.3 artifacts), the build failed ``` mvn install -DskipTests -Pyarn,hadoop-2.7,hive -Dhadoop.version=2.7.3 ... [INFO] [INFO] Building Spark Project Launcher 2.1.0-SNAPSHOT [INFO] Downloading: https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client/2.7.3/hadoop-client-2.7.3.pom [WARNING] The POM for org.apache.hadoop:hadoop-client:jar:2.7.3 is missing, no dependency information available Downloading: https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client/2.7.3/hadoop-client-2.7.3.jar [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM ... SUCCESS [ 4.482 s] [INFO] Spark Project Tags . SUCCESS [ 17.402 s] [INFO] Spark Project Sketch ... SUCCESS [ 11.252 s] [INFO] Spark Project Networking ... SUCCESS [ 13.458 s] [INFO] Spark Project Shuffle Streaming Service SUCCESS [ 9.043 s] [INFO] Spark Project Unsafe ... SUCCESS [ 16.027 s] [INFO] Spark Project Launcher . FAILURE [ 1.653 s] [INFO] Spark Project Core . SKIPPED ... ``` With the profile, the build completed ``` mvn install -DskipTests -Pyarn,hadoop-2.7,hive,snapshots-and-staging -Dhadoop.version=2.7.3 ``` Author: Steve LoughranCloses #14646 from steveloughran/stevel/SPARK-17058-support-asf-snapshots. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37d95227 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37d95227 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37d95227 Branch: refs/heads/master Commit: 37d95227a21de602b939dae84943ba007f434513 Parents: 3c24299 Author: Steve Loughran Authored: Wed Nov 2 11:52:29 2016 -0700 Committer: Reynold Xin Committed: Wed Nov 2 11:52:29 2016 -0700 -- pom.xml | 48 1 file changed, 48 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/37d95227/pom.xml -- diff --git a/pom.xml b/pom.xml index aaf7cfa..04d2eaa 100644 --- a/pom.xml +++ b/pom.xml @@ -2694,6 +2694,54 @@ + + snapshots-and-staging + + + https://repository.apache.org/content/groups/staging/ + https://repository.apache.org/content/repositories/snapshots/ + + + + + ASF Staging + ${asf.staging} + + + ASF Snapshots + ${asf.snapshots} + +true + + +false + + + + + + + ASF Staging + ${asf.staging} + + + ASF Snapshots + ${asf.snapshots} + +true + + +false + + + + + +
spark git commit: [SPARK-18111][SQL] Wrong approximate quantile answer when multiple records have the minimum value(for branch 2.0)
Repository: spark Updated Branches: refs/heads/branch-2.0 1696bcfad -> 3253ae7f7 [SPARK-18111][SQL] Wrong approximate quantile answer when multiple records have the minimum value(for branch 2.0) ## What changes were proposed in this pull request? When multiple records have the minimum value, the answer of `StatFunctions.multipleApproxQuantiles` is wrong. ## How was this patch tested? add a test case Author: wangzhenhuaCloses #15732 from wzhfy/percentile2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3253ae7f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3253ae7f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3253ae7f Branch: refs/heads/branch-2.0 Commit: 3253ae7f722a996cf0af21608e1a27d5d2a12004 Parents: 1696bcf Author: wangzhenhua Authored: Wed Nov 2 11:49:30 2016 -0700 Committer: Reynold Xin Committed: Wed Nov 2 11:49:30 2016 -0700 -- .../spark/sql/execution/stat/StatFunctions.scala | 4 +++- .../org/apache/spark/sql/DataFrameStatSuite.scala | 13 + 2 files changed, 16 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3253ae7f/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 7e2ebe8..acc42a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -337,7 +337,9 @@ object StatFunctions extends Logging { res.prepend(head) // If necessary, add the minimum element: val currHead = currentSamples.head - if (currHead.value < head.value) { + // don't add the minimum element if `currentSamples` has only one element (both `currHead` and + // `head` point to the same element) + if (currHead.value <= head.value && currentSamples.length > 1) { res.prepend(currentSamples.head) } res.toArray http://git-wip-us.apache.org/repos/asf/spark/blob/3253ae7f/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 73026c7..571e2ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -152,6 +152,19 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext { } } + test("approximate quantile, multiple records with the minimum value in a partition") { +val data = Seq(1, 1, 2, 1, 1, 3, 1, 1, 4, 1, 1, 5) +val df = spark.sparkContext.makeRDD(data, 4).toDF("col") +val epsilons = List(0.1, 0.05, 0.001) +val quantile = 0.5 +val expected = 1 +for (epsilon <- epsilons) { + val Array(answer) = df.stat.approxQuantile("col", Array(quantile), epsilon) + val error = 2 * data.length * epsilon + assert(math.abs(answer - expected) < error) +} + } + test("crosstab") { val rng = new Random() val data = Seq.tabulate(25)(i => (rng.nextInt(5), rng.nextInt(10))) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18160][CORE][YARN] spark.files & spark.jars should not be passed to driver in yarn mode
Repository: spark Updated Branches: refs/heads/branch-2.0 eb790c5b1 -> 1696bcfad [SPARK-18160][CORE][YARN] spark.files & spark.jars should not be passed to driver in yarn mode ## What changes were proposed in this pull request? spark.files is still passed to driver in yarn mode, so SparkContext will still handle it which cause the error in the jira desc. ## How was this patch tested? Tested manually in a 5 node cluster. As this issue only happens in multiple node cluster, so I didn't write test for it. Author: Jeff ZhangCloses #15669 from zjffdu/SPARK-18160. (cherry picked from commit 3c24299b71e23e159edbb972347b13430f92a465) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1696bcfa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1696bcfa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1696bcfa Branch: refs/heads/branch-2.0 Commit: 1696bcfadabb91693bf1ab556a321949d1e4fe45 Parents: eb790c5 Author: Jeff Zhang Authored: Wed Nov 2 11:47:45 2016 -0700 Committer: Marcelo Vanzin Committed: Wed Nov 2 11:48:25 2016 -0700 -- .../scala/org/apache/spark/SparkContext.scala | 29 .../org/apache/spark/deploy/yarn/Client.scala | 5 +++- 2 files changed, 10 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1696bcfa/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e9f9d72..43cec70 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1705,29 +1705,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli key = uri.getScheme match { // A JAR file which exists only on the driver node case null | "file" => -if (master == "yarn" && deployMode == "cluster") { - // In order for this to work in yarn cluster mode the user must specify the - // --addJars option to the client to upload the file into the distributed cache - // of the AM to make it show up in the current working directory. - val fileName = new Path(uri.getPath).getName() - try { -env.rpcEnv.fileServer.addJar(new File(fileName)) - } catch { -case e: Exception => - // For now just log an error but allow to go through so spark examples work. - // The spark examples don't really need the jar distributed since its also - // the app jar. - logError("Error adding jar (" + e + "), was the --addJars option used?") - null - } -} else { - try { -env.rpcEnv.fileServer.addJar(new File(uri.getPath)) - } catch { -case exc: FileNotFoundException => - logError(s"Jar not found at $path") - null - } +try { + env.rpcEnv.fileServer.addJar(new File(uri.getPath)) +} catch { + case exc: FileNotFoundException => +logError(s"Jar not found at $path") +null } // A JAR file which exists locally on every worker node case "local" => http://git-wip-us.apache.org/repos/asf/spark/blob/1696bcfa/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index a47a64c..981da4b 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1170,7 +1170,10 @@ private object Client extends Logging { // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf - +// SparkSubmit would use yarn cache to distribute files & jars in yarn mode, +// so remove them from sparkConf here for yarn mode. +sparkConf.remove("spark.jars") +sparkConf.remove("spark.files") val args = new ClientArguments(argStrings) new Client(args, sparkConf).run() } - To unsubscribe, e-mail:
spark git commit: [SPARK-18160][CORE][YARN] spark.files & spark.jars should not be passed to driver in yarn mode
Repository: spark Updated Branches: refs/heads/branch-2.1 0093257ea -> bd3ea6595 [SPARK-18160][CORE][YARN] spark.files & spark.jars should not be passed to driver in yarn mode ## What changes were proposed in this pull request? spark.files is still passed to driver in yarn mode, so SparkContext will still handle it which cause the error in the jira desc. ## How was this patch tested? Tested manually in a 5 node cluster. As this issue only happens in multiple node cluster, so I didn't write test for it. Author: Jeff ZhangCloses #15669 from zjffdu/SPARK-18160. (cherry picked from commit 3c24299b71e23e159edbb972347b13430f92a465) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd3ea659 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd3ea659 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd3ea659 Branch: refs/heads/branch-2.1 Commit: bd3ea6595788a4fe5399e6c6c18d8cb6872c Parents: 0093257 Author: Jeff Zhang Authored: Wed Nov 2 11:47:45 2016 -0700 Committer: Marcelo Vanzin Committed: Wed Nov 2 11:48:09 2016 -0700 -- .../scala/org/apache/spark/SparkContext.scala | 29 .../org/apache/spark/deploy/yarn/Client.scala | 5 +++- 2 files changed, 10 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bd3ea659/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4694790..63478c8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1716,29 +1716,12 @@ class SparkContext(config: SparkConf) extends Logging { key = uri.getScheme match { // A JAR file which exists only on the driver node case null | "file" => -if (master == "yarn" && deployMode == "cluster") { - // In order for this to work in yarn cluster mode the user must specify the - // --addJars option to the client to upload the file into the distributed cache - // of the AM to make it show up in the current working directory. - val fileName = new Path(uri.getPath).getName() - try { -env.rpcEnv.fileServer.addJar(new File(fileName)) - } catch { -case e: Exception => - // For now just log an error but allow to go through so spark examples work. - // The spark examples don't really need the jar distributed since its also - // the app jar. - logError("Error adding jar (" + e + "), was the --addJars option used?") - null - } -} else { - try { -env.rpcEnv.fileServer.addJar(new File(uri.getPath)) - } catch { -case exc: FileNotFoundException => - logError(s"Jar not found at $path") - null - } +try { + env.rpcEnv.fileServer.addJar(new File(uri.getPath)) +} catch { + case exc: FileNotFoundException => +logError(s"Jar not found at $path") +null } // A JAR file which exists locally on every worker node case "local" => http://git-wip-us.apache.org/repos/asf/spark/blob/bd3ea659/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 55e4a83..053a786 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1202,7 +1202,10 @@ private object Client extends Logging { // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf - +// SparkSubmit would use yarn cache to distribute files & jars in yarn mode, +// so remove them from sparkConf here for yarn mode. +sparkConf.remove("spark.jars") +sparkConf.remove("spark.files") val args = new ClientArguments(argStrings) new Client(args, sparkConf).run() } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional
spark git commit: [SPARK-18160][CORE][YARN] spark.files & spark.jars should not be passed to driver in yarn mode
Repository: spark Updated Branches: refs/heads/master 02f203107 -> 3c24299b7 [SPARK-18160][CORE][YARN] spark.files & spark.jars should not be passed to driver in yarn mode ## What changes were proposed in this pull request? spark.files is still passed to driver in yarn mode, so SparkContext will still handle it which cause the error in the jira desc. ## How was this patch tested? Tested manually in a 5 node cluster. As this issue only happens in multiple node cluster, so I didn't write test for it. Author: Jeff ZhangCloses #15669 from zjffdu/SPARK-18160. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c24299b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c24299b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c24299b Branch: refs/heads/master Commit: 3c24299b71e23e159edbb972347b13430f92a465 Parents: 02f2031 Author: Jeff Zhang Authored: Wed Nov 2 11:47:45 2016 -0700 Committer: Marcelo Vanzin Committed: Wed Nov 2 11:47:45 2016 -0700 -- .../scala/org/apache/spark/SparkContext.scala | 29 .../org/apache/spark/deploy/yarn/Client.scala | 5 +++- 2 files changed, 10 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3c24299b/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4694790..63478c8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1716,29 +1716,12 @@ class SparkContext(config: SparkConf) extends Logging { key = uri.getScheme match { // A JAR file which exists only on the driver node case null | "file" => -if (master == "yarn" && deployMode == "cluster") { - // In order for this to work in yarn cluster mode the user must specify the - // --addJars option to the client to upload the file into the distributed cache - // of the AM to make it show up in the current working directory. - val fileName = new Path(uri.getPath).getName() - try { -env.rpcEnv.fileServer.addJar(new File(fileName)) - } catch { -case e: Exception => - // For now just log an error but allow to go through so spark examples work. - // The spark examples don't really need the jar distributed since its also - // the app jar. - logError("Error adding jar (" + e + "), was the --addJars option used?") - null - } -} else { - try { -env.rpcEnv.fileServer.addJar(new File(uri.getPath)) - } catch { -case exc: FileNotFoundException => - logError(s"Jar not found at $path") - null - } +try { + env.rpcEnv.fileServer.addJar(new File(uri.getPath)) +} catch { + case exc: FileNotFoundException => +logError(s"Jar not found at $path") +null } // A JAR file which exists locally on every worker node case "local" => http://git-wip-us.apache.org/repos/asf/spark/blob/3c24299b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 55e4a83..053a786 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1202,7 +1202,10 @@ private object Client extends Logging { // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf - +// SparkSubmit would use yarn cache to distribute files & jars in yarn mode, +// so remove them from sparkConf here for yarn mode. +sparkConf.remove("spark.jars") +sparkConf.remove("spark.files") val args = new ClientArguments(argStrings) new Client(args, sparkConf).run() } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17895] Improve doc for rangeBetween and rowsBetween
Repository: spark Updated Branches: refs/heads/master 4af0ce2d9 -> 742e0fea5 [SPARK-17895] Improve doc for rangeBetween and rowsBetween ## What changes were proposed in this pull request? Copied description for row and range based frame boundary from https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala#L56 Added examples to show different behavior of rangeBetween and rowsBetween when involving duplicate values. Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request. Author: buzhihuojieCloses #15727 from david-weiluo-ren/improveDocForRangeAndRowsBetween. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/742e0fea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/742e0fea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/742e0fea Branch: refs/heads/master Commit: 742e0fea5391857964e90d396641ecf95cac4248 Parents: 4af0ce2 Author: buzhihuojie Authored: Wed Nov 2 11:36:20 2016 -0700 Committer: Reynold Xin Committed: Wed Nov 2 11:36:20 2016 -0700 -- .../apache/spark/sql/expressions/Window.scala | 55 .../spark/sql/expressions/WindowSpec.scala | 55 2 files changed, 110 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/742e0fea/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala index 0b26d86..327bc37 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala @@ -121,6 +121,32 @@ object Window { * and [[Window.currentRow]] to specify special boundary values, rather than using integral * values directly. * + * A row based boundary is based on the position of the row within the partition. + * An offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * + * {{{ + * import org.apache.spark.sql.expressions.Window + * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")) + * .toDF("id", "category") + * df.withColumn("sum", + * sum('id) over Window.partitionBy('category).orderBy('id).rowsBetween(0,1)) + * .show() + * + * +---++---+ + * | id|category|sum| + * +---++---+ + * | 1| b| 3| + * | 2| b| 5| + * | 3| b| 3| + * | 1| a| 2| + * | 1| a| 3| + * | 2| a| 2| + * +---++---+ + * }}} + * * @param start boundary start, inclusive. The frame is unbounded if this is * the minimum long value ([[Window.unboundedPreceding]]). * @param end boundary end, inclusive. The frame is unbounded if this is the @@ -144,6 +170,35 @@ object Window { * and [[Window.currentRow]] to specify special boundary values, rather than using integral * values directly. * + * A range based boundary is based on the actual value of the ORDER BY + * expression(s). An offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER BY expressions: there can be only one expression and this + * expression must have a numerical data type. An exception can be made when the offset is 0, + * because no value modification is needed, in this case multiple and non-numeric ORDER BY + * expression are allowed. + * + * {{{ + * import org.apache.spark.sql.expressions.Window + * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")) + * .toDF("id", "category") + * df.withColumn("sum", + * sum('id) over Window.partitionBy('category).orderBy('id).rangeBetween(0,1)) + * .show() + * + * +---++---+ + * | id|category|sum| + * +---++---+ + * | 1| b| 3| + * | 2| b| 5| + * | 3| b| 3| + * | 1| a| 4| + * | 1| a| 4| + * | 2| a| 2| + * +---++---+ + * }}} + *
spark git commit: [SPARK-14393][SQL] values generated by non-deterministic functions shouldn't change after coalesce or union
Repository: spark Updated Branches: refs/heads/branch-2.1 a885d5bbc -> 0093257ea [SPARK-14393][SQL] values generated by non-deterministic functions shouldn't change after coalesce or union ## What changes were proposed in this pull request? When a user appended a column using a "nondeterministic" function to a DataFrame, e.g., `rand`, `randn`, and `monotonically_increasing_id`, the expected semantic is the following: - The value in each row should remain unchanged, as if we materialize the column immediately, regardless of later DataFrame operations. However, since we use `TaskContext.getPartitionId` to get the partition index from the current thread, the values from nondeterministic columns might change if we call `union` or `coalesce` after. `TaskContext.getPartitionId` returns the partition index of the current Spark task, which might not be the corresponding partition index of the DataFrame where we defined the column. See the unit tests below or JIRA for examples. This PR uses the partition index from `RDD.mapPartitionWithIndex` instead of `TaskContext` and fixes the partition initialization logic in whole-stage codegen, normal codegen, and codegen fallback. `initializeStatesForPartition(partitionIndex: Int)` was added to `Projection`, `Nondeterministic`, and `Predicate` (codegen) and initialized right after object creation in `mapPartitionWithIndex`. `newPredicate` now returns a `Predicate` instance rather than a function for proper initialization. ## How was this patch tested? Unit tests. (Actually I'm not very confident that this PR fixed all issues without introducing new ones ...) cc: rxin davies Author: Xiangrui MengCloses #15567 from mengxr/SPARK-14393. (cherry picked from commit 02f203107b8eda1f1576e36c4f12b0e3bc5e910e) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0093257e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0093257e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0093257e Branch: refs/heads/branch-2.1 Commit: 0093257ea94d3a197ca061b54c04685d7c1f616a Parents: a885d5b Author: Xiangrui Meng Authored: Wed Nov 2 11:41:49 2016 -0700 Committer: Reynold Xin Committed: Wed Nov 2 11:42:01 2016 -0700 -- .../main/scala/org/apache/spark/rdd/RDD.scala | 16 +- .../sql/catalyst/expressions/Expression.scala | 19 +-- .../catalyst/expressions/InputFileName.scala| 2 +- .../expressions/MonotonicallyIncreasingID.scala | 11 +++-- .../sql/catalyst/expressions/Projection.scala | 22 ++--- .../catalyst/expressions/SparkPartitionID.scala | 13 +++-- .../expressions/codegen/CodeGenerator.scala | 14 ++ .../expressions/codegen/CodegenFallback.scala | 18 +-- .../codegen/GenerateMutableProjection.scala | 4 ++ .../expressions/codegen/GeneratePredicate.scala | 18 +-- .../codegen/GenerateSafeProjection.scala| 4 ++ .../codegen/GenerateUnsafeProjection.scala | 4 ++ .../sql/catalyst/expressions/package.scala | 10 +++- .../sql/catalyst/expressions/predicates.scala | 4 -- .../expressions/randomExpressions.scala | 14 +++--- .../sql/catalyst/optimizer/Optimizer.scala | 1 + .../expressions/ExpressionEvalHelper.scala | 5 +- .../codegen/CodegenExpressionCachingSuite.scala | 13 +++-- .../sql/execution/DataSourceScanExec.scala | 6 ++- .../spark/sql/execution/ExistingRDD.scala | 3 +- .../spark/sql/execution/GenerateExec.scala | 3 +- .../apache/spark/sql/execution/SparkPlan.scala | 4 +- .../sql/execution/WholeStageCodegenExec.scala | 8 ++- .../sql/execution/basicPhysicalOperators.scala | 8 +-- .../columnar/InMemoryTableScanExec.scala| 5 +- .../joins/BroadcastNestedLoopJoinExec.scala | 7 +-- .../execution/joins/CartesianProductExec.scala | 8 +-- .../spark/sql/execution/joins/HashJoin.scala| 2 +- .../sql/execution/joins/SortMergeJoinExec.scala | 2 +- .../apache/spark/sql/execution/objects.scala| 6 ++- .../spark/sql/DataFrameFunctionsSuite.scala | 52 .../sql/hive/execution/HiveTableScanExec.scala | 3 +- 32 files changed, 231 insertions(+), 78 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0093257e/core/src/main/scala/org/apache/spark/rdd/RDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index db535de..e018af3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -788,14 +788,26 @@ abstract class RDD[T: ClassTag]( } /** -
spark git commit: [SPARK-14393][SQL] values generated by non-deterministic functions shouldn't change after coalesce or union
Repository: spark Updated Branches: refs/heads/master 742e0fea5 -> 02f203107 [SPARK-14393][SQL] values generated by non-deterministic functions shouldn't change after coalesce or union ## What changes were proposed in this pull request? When a user appended a column using a "nondeterministic" function to a DataFrame, e.g., `rand`, `randn`, and `monotonically_increasing_id`, the expected semantic is the following: - The value in each row should remain unchanged, as if we materialize the column immediately, regardless of later DataFrame operations. However, since we use `TaskContext.getPartitionId` to get the partition index from the current thread, the values from nondeterministic columns might change if we call `union` or `coalesce` after. `TaskContext.getPartitionId` returns the partition index of the current Spark task, which might not be the corresponding partition index of the DataFrame where we defined the column. See the unit tests below or JIRA for examples. This PR uses the partition index from `RDD.mapPartitionWithIndex` instead of `TaskContext` and fixes the partition initialization logic in whole-stage codegen, normal codegen, and codegen fallback. `initializeStatesForPartition(partitionIndex: Int)` was added to `Projection`, `Nondeterministic`, and `Predicate` (codegen) and initialized right after object creation in `mapPartitionWithIndex`. `newPredicate` now returns a `Predicate` instance rather than a function for proper initialization. ## How was this patch tested? Unit tests. (Actually I'm not very confident that this PR fixed all issues without introducing new ones ...) cc: rxin davies Author: Xiangrui MengCloses #15567 from mengxr/SPARK-14393. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02f20310 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02f20310 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02f20310 Branch: refs/heads/master Commit: 02f203107b8eda1f1576e36c4f12b0e3bc5e910e Parents: 742e0fe Author: Xiangrui Meng Authored: Wed Nov 2 11:41:49 2016 -0700 Committer: Reynold Xin Committed: Wed Nov 2 11:41:49 2016 -0700 -- .../main/scala/org/apache/spark/rdd/RDD.scala | 16 +- .../sql/catalyst/expressions/Expression.scala | 19 +-- .../catalyst/expressions/InputFileName.scala| 2 +- .../expressions/MonotonicallyIncreasingID.scala | 11 +++-- .../sql/catalyst/expressions/Projection.scala | 22 ++--- .../catalyst/expressions/SparkPartitionID.scala | 13 +++-- .../expressions/codegen/CodeGenerator.scala | 14 ++ .../expressions/codegen/CodegenFallback.scala | 18 +-- .../codegen/GenerateMutableProjection.scala | 4 ++ .../expressions/codegen/GeneratePredicate.scala | 18 +-- .../codegen/GenerateSafeProjection.scala| 4 ++ .../codegen/GenerateUnsafeProjection.scala | 4 ++ .../sql/catalyst/expressions/package.scala | 10 +++- .../sql/catalyst/expressions/predicates.scala | 4 -- .../expressions/randomExpressions.scala | 14 +++--- .../sql/catalyst/optimizer/Optimizer.scala | 1 + .../expressions/ExpressionEvalHelper.scala | 5 +- .../codegen/CodegenExpressionCachingSuite.scala | 13 +++-- .../sql/execution/DataSourceScanExec.scala | 6 ++- .../spark/sql/execution/ExistingRDD.scala | 3 +- .../spark/sql/execution/GenerateExec.scala | 3 +- .../apache/spark/sql/execution/SparkPlan.scala | 4 +- .../sql/execution/WholeStageCodegenExec.scala | 8 ++- .../sql/execution/basicPhysicalOperators.scala | 8 +-- .../columnar/InMemoryTableScanExec.scala| 5 +- .../joins/BroadcastNestedLoopJoinExec.scala | 7 +-- .../execution/joins/CartesianProductExec.scala | 8 +-- .../spark/sql/execution/joins/HashJoin.scala| 2 +- .../sql/execution/joins/SortMergeJoinExec.scala | 2 +- .../apache/spark/sql/execution/objects.scala| 6 ++- .../spark/sql/DataFrameFunctionsSuite.scala | 52 .../sql/hive/execution/HiveTableScanExec.scala | 3 +- 32 files changed, 231 insertions(+), 78 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/02f20310/core/src/main/scala/org/apache/spark/rdd/RDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index db535de..e018af3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -788,14 +788,26 @@ abstract class RDD[T: ClassTag]( } /** - * [performance] Spark's internal mapPartitions method which skips closure cleaning. It is a - * performance API to be used
spark git commit: [SPARK-17895] Improve doc for rangeBetween and rowsBetween
Repository: spark Updated Branches: refs/heads/branch-2.1 9be069125 -> a885d5bbc [SPARK-17895] Improve doc for rangeBetween and rowsBetween ## What changes were proposed in this pull request? Copied description for row and range based frame boundary from https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala#L56 Added examples to show different behavior of rangeBetween and rowsBetween when involving duplicate values. Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request. Author: buzhihuojieCloses #15727 from david-weiluo-ren/improveDocForRangeAndRowsBetween. (cherry picked from commit 742e0fea5391857964e90d396641ecf95cac4248) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a885d5bb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a885d5bb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a885d5bb Branch: refs/heads/branch-2.1 Commit: a885d5bbce9dba66b394850b3aac51ae97cb18dd Parents: 9be0691 Author: buzhihuojie Authored: Wed Nov 2 11:36:20 2016 -0700 Committer: Reynold Xin Committed: Wed Nov 2 11:36:26 2016 -0700 -- .../apache/spark/sql/expressions/Window.scala | 55 .../spark/sql/expressions/WindowSpec.scala | 55 2 files changed, 110 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a885d5bb/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala index 0b26d86..327bc37 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala @@ -121,6 +121,32 @@ object Window { * and [[Window.currentRow]] to specify special boundary values, rather than using integral * values directly. * + * A row based boundary is based on the position of the row within the partition. + * An offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * + * {{{ + * import org.apache.spark.sql.expressions.Window + * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")) + * .toDF("id", "category") + * df.withColumn("sum", + * sum('id) over Window.partitionBy('category).orderBy('id).rowsBetween(0,1)) + * .show() + * + * +---++---+ + * | id|category|sum| + * +---++---+ + * | 1| b| 3| + * | 2| b| 5| + * | 3| b| 3| + * | 1| a| 2| + * | 1| a| 3| + * | 2| a| 2| + * +---++---+ + * }}} + * * @param start boundary start, inclusive. The frame is unbounded if this is * the minimum long value ([[Window.unboundedPreceding]]). * @param end boundary end, inclusive. The frame is unbounded if this is the @@ -144,6 +170,35 @@ object Window { * and [[Window.currentRow]] to specify special boundary values, rather than using integral * values directly. * + * A range based boundary is based on the actual value of the ORDER BY + * expression(s). An offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER BY expressions: there can be only one expression and this + * expression must have a numerical data type. An exception can be made when the offset is 0, + * because no value modification is needed, in this case multiple and non-numeric ORDER BY + * expression are allowed. + * + * {{{ + * import org.apache.spark.sql.expressions.Window + * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")) + * .toDF("id", "category") + * df.withColumn("sum", + * sum('id) over Window.partitionBy('category).orderBy('id).rangeBetween(0,1)) + * .show() + * + * +---++---+ + * | id|category|sum| + * +---++---+ + * | 1| b| 3| + * | 2| b| 5| + * | 3| b|
spark git commit: [SPARK-17683][SQL] Support ArrayType in Literal.apply
Repository: spark Updated Branches: refs/heads/branch-2.1 41491e540 -> 9be069125 [SPARK-17683][SQL] Support ArrayType in Literal.apply ## What changes were proposed in this pull request? This pr is to add pattern-matching entries for array data in `Literal.apply`. ## How was this patch tested? Added tests in `LiteralExpressionSuite`. Author: Takeshi YAMAMUROCloses #15257 from maropu/SPARK-17683. (cherry picked from commit 4af0ce2d96de3397c9bc05684cad290a52486577) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9be06912 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9be06912 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9be06912 Branch: refs/heads/branch-2.1 Commit: 9be069125f7e94df9d862f307b87965baf9416e3 Parents: 41491e5 Author: Takeshi YAMAMURO Authored: Wed Nov 2 11:29:26 2016 -0700 Committer: Reynold Xin Committed: Wed Nov 2 11:29:39 2016 -0700 -- .../sql/catalyst/expressions/literals.scala | 57 +++- .../expressions/LiteralExpressionSuite.scala| 27 +- 2 files changed, 82 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9be06912/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index a597a17..1985e68 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -17,14 +17,25 @@ package org.apache.spark.sql.catalyst.expressions +import java.lang.{Boolean => JavaBoolean} +import java.lang.{Byte => JavaByte} +import java.lang.{Double => JavaDouble} +import java.lang.{Float => JavaFloat} +import java.lang.{Integer => JavaInteger} +import java.lang.{Long => JavaLong} +import java.lang.{Short => JavaShort} +import java.math.{BigDecimal => JavaBigDecimal} import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import java.util import java.util.Objects import javax.xml.bind.DatatypeConverter +import scala.math.{BigDecimal, BigInt} + import org.json4s.JsonAST._ +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -46,12 +57,17 @@ object Literal { case s: String => Literal(UTF8String.fromString(s), StringType) case b: Boolean => Literal(b, BooleanType) case d: BigDecimal => Literal(Decimal(d), DecimalType(Math.max(d.precision, d.scale), d.scale)) -case d: java.math.BigDecimal => +case d: JavaBigDecimal => Literal(Decimal(d), DecimalType(Math.max(d.precision, d.scale), d.scale())) case d: Decimal => Literal(d, DecimalType(Math.max(d.precision, d.scale), d.scale)) case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType) case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType) case a: Array[Byte] => Literal(a, BinaryType) +case a: Array[_] => + val elementType = componentTypeToDataType(a.getClass.getComponentType()) + val dataType = ArrayType(elementType) + val convert = CatalystTypeConverters.createToCatalystConverter(dataType) + Literal(convert(a), dataType) case i: CalendarInterval => Literal(i, CalendarIntervalType) case null => Literal(null, NullType) case v: Literal => v @@ -60,6 +76,45 @@ object Literal { } /** + * Returns the Spark SQL DataType for a given class object. Since this type needs to be resolved + * in runtime, we use match-case idioms for class objects here. However, there are similar + * functions in other files (e.g., HiveInspectors), so these functions need to merged into one. + */ + private[this] def componentTypeToDataType(clz: Class[_]): DataType = clz match { +// primitive types +case JavaShort.TYPE => ShortType +case JavaInteger.TYPE => IntegerType +case JavaLong.TYPE => LongType +case JavaDouble.TYPE => DoubleType +case JavaByte.TYPE => ByteType +case JavaFloat.TYPE => FloatType +case JavaBoolean.TYPE => BooleanType + +// java classes +case _ if clz == classOf[Date] => DateType +case _ if clz == classOf[Timestamp] => TimestampType +case _ if clz == classOf[JavaBigDecimal] => DecimalType.SYSTEM_DEFAULT +case _ if clz ==
spark git commit: [SPARK-17683][SQL] Support ArrayType in Literal.apply
Repository: spark Updated Branches: refs/heads/master f151bd1af -> 4af0ce2d9 [SPARK-17683][SQL] Support ArrayType in Literal.apply ## What changes were proposed in this pull request? This pr is to add pattern-matching entries for array data in `Literal.apply`. ## How was this patch tested? Added tests in `LiteralExpressionSuite`. Author: Takeshi YAMAMUROCloses #15257 from maropu/SPARK-17683. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4af0ce2d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4af0ce2d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4af0ce2d Branch: refs/heads/master Commit: 4af0ce2d96de3397c9bc05684cad290a52486577 Parents: f151bd1 Author: Takeshi YAMAMURO Authored: Wed Nov 2 11:29:26 2016 -0700 Committer: Reynold Xin Committed: Wed Nov 2 11:29:26 2016 -0700 -- .../sql/catalyst/expressions/literals.scala | 57 +++- .../expressions/LiteralExpressionSuite.scala| 27 +- 2 files changed, 82 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4af0ce2d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index a597a17..1985e68 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -17,14 +17,25 @@ package org.apache.spark.sql.catalyst.expressions +import java.lang.{Boolean => JavaBoolean} +import java.lang.{Byte => JavaByte} +import java.lang.{Double => JavaDouble} +import java.lang.{Float => JavaFloat} +import java.lang.{Integer => JavaInteger} +import java.lang.{Long => JavaLong} +import java.lang.{Short => JavaShort} +import java.math.{BigDecimal => JavaBigDecimal} import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import java.util import java.util.Objects import javax.xml.bind.DatatypeConverter +import scala.math.{BigDecimal, BigInt} + import org.json4s.JsonAST._ +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -46,12 +57,17 @@ object Literal { case s: String => Literal(UTF8String.fromString(s), StringType) case b: Boolean => Literal(b, BooleanType) case d: BigDecimal => Literal(Decimal(d), DecimalType(Math.max(d.precision, d.scale), d.scale)) -case d: java.math.BigDecimal => +case d: JavaBigDecimal => Literal(Decimal(d), DecimalType(Math.max(d.precision, d.scale), d.scale())) case d: Decimal => Literal(d, DecimalType(Math.max(d.precision, d.scale), d.scale)) case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType) case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType) case a: Array[Byte] => Literal(a, BinaryType) +case a: Array[_] => + val elementType = componentTypeToDataType(a.getClass.getComponentType()) + val dataType = ArrayType(elementType) + val convert = CatalystTypeConverters.createToCatalystConverter(dataType) + Literal(convert(a), dataType) case i: CalendarInterval => Literal(i, CalendarIntervalType) case null => Literal(null, NullType) case v: Literal => v @@ -60,6 +76,45 @@ object Literal { } /** + * Returns the Spark SQL DataType for a given class object. Since this type needs to be resolved + * in runtime, we use match-case idioms for class objects here. However, there are similar + * functions in other files (e.g., HiveInspectors), so these functions need to merged into one. + */ + private[this] def componentTypeToDataType(clz: Class[_]): DataType = clz match { +// primitive types +case JavaShort.TYPE => ShortType +case JavaInteger.TYPE => IntegerType +case JavaLong.TYPE => LongType +case JavaDouble.TYPE => DoubleType +case JavaByte.TYPE => ByteType +case JavaFloat.TYPE => FloatType +case JavaBoolean.TYPE => BooleanType + +// java classes +case _ if clz == classOf[Date] => DateType +case _ if clz == classOf[Timestamp] => TimestampType +case _ if clz == classOf[JavaBigDecimal] => DecimalType.SYSTEM_DEFAULT +case _ if clz == classOf[Array[Byte]] => BinaryType +case _ if clz == classOf[JavaShort] => ShortType +case _ if clz == classOf[JavaInteger] =>
spark git commit: [SPARK-16796][WEB UI] Mask spark.authenticate.secret on Spark environ…
Repository: spark Updated Branches: refs/heads/branch-2.0 09178b6ee -> eb790c5b1 [SPARK-16796][WEB UI] Mask spark.authenticate.secret on Spark environ… ## What changes were proposed in this pull request? Mask `spark.authenticate.secret` on Spark environment page (Web UI). This is addition to https://github.com/apache/spark/pull/14409 ## How was this patch tested? `./dev/run-tests` [info] ScalaTest [info] Run completed in 1 hour, 8 minutes, 38 seconds. [info] Total number of tests run: 2166 [info] Suites: completed 65, aborted 0 [info] Tests: succeeded 2166, failed 0, canceled 0, ignored 590, pending 0 [info] All tests passed. Author: Artur SukhenkoCloses #14484 from Devian-ua/SPARK-16796. (cherry picked from commit 14dba45208d8a5511be2cf8ddf22e688ef141e88) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb790c5b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb790c5b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb790c5b Branch: refs/heads/branch-2.0 Commit: eb790c5b110c1227d0d6c8593254de196f65da35 Parents: 09178b6 Author: Artur Sukhenko Authored: Sat Aug 6 04:41:47 2016 +0100 Committer: Sean Owen Committed: Wed Nov 2 18:06:16 2016 + -- .../src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eb790c5b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala index 22136a6..9f6e9a6 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala @@ -27,7 +27,9 @@ private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") private val listener = parent.listener private def removePass(kv: (String, String)): (String, String) = { -if (kv._1.toLowerCase.contains("password")) (kv._1, "**") else kv +if (kv._1.toLowerCase.contains("password") || kv._1.toLowerCase.contains("secret")) { + (kv._1, "**") +} else kv } def render(request: HttpServletRequest): Seq[Node] = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16839][SQL] Simplify Struct creation code path
Repository: spark Updated Branches: refs/heads/branch-2.1 176afa5e8 -> 41491e540 [SPARK-16839][SQL] Simplify Struct creation code path ## What changes were proposed in this pull request? Simplify struct creation, especially the aspect of `CleanupAliases` which missed some aliases when handling trees created by `CreateStruct`. This PR includes: 1. A failing test (create struct with nested aliases, some of the aliases survive `CleanupAliases`). 2. A fix that transforms `CreateStruct` into a `CreateNamedStruct` constructor, effectively eliminating `CreateStruct` from all expression trees. 3. A `NamePlaceHolder` used by `CreateStruct` when column names cannot be extracted from unresolved `NamedExpression`. 4. A new Analyzer rule that resolves `NamePlaceHolder` into a string literal once the `NamedExpression` is resolved. 5. `CleanupAliases` code was simplified as it no longer has to deal with `CreateStruct`'s top level columns. ## How was this patch tested? Running all tests-suits in package org.apache.spark.sql, especially including the analysis suite, making sure added test initially fails, after applying suggested fix rerun the entire analysis package successfully. Modified few tests that expected `CreateStruct` which is now transformed into `CreateNamedStruct`. Author: eyal farago Author: Herman van HovellAuthor: eyal farago Author: Eyal Farago Author: Hyukjin Kwon Author: eyalfa Closes #15718 from hvanhovell/SPARK-16839-2. (cherry picked from commit f151bd1af8a05d4b6c901ebe6ac0b51a4a1a20df) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/41491e54 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/41491e54 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/41491e54 Branch: refs/heads/branch-2.1 Commit: 41491e54080742f6e4a1e80a72cd9f46a9336e31 Parents: 176afa5 Author: eyal farago Authored: Wed Nov 2 11:12:20 2016 +0100 Committer: Herman van Hovell Committed: Wed Nov 2 11:12:35 2016 +0100 -- R/pkg/inst/tests/testthat/test_sparkSQL.R | 12 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 53 ++--- .../catalyst/analysis/FunctionRegistry.scala| 2 +- .../sql/catalyst/expressions/Projection.scala | 2 - .../expressions/complexTypeCreator.scala| 212 +++ .../spark/sql/catalyst/parser/AstBuilder.scala | 4 +- .../sql/catalyst/analysis/AnalysisSuite.scala | 38 +++- .../catalyst/expressions/ComplexTypeSuite.scala | 1 - .../scala/org/apache/spark/sql/Column.scala | 3 + .../command/AnalyzeColumnCommand.scala | 4 +- .../sql-tests/results/group-by.sql.out | 2 +- .../apache/spark/sql/hive/test/TestHive.scala | 20 +- .../resources/sqlgen/subquery_in_having_2.sql | 2 +- .../sql/catalyst/LogicalPlanToSQLSuite.scala| 12 +- 14 files changed, 169 insertions(+), 198 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/41491e54/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 806019d..d7fe6b3 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1222,16 +1222,16 @@ test_that("column functions", { # Test struct() df <- createDataFrame(list(list(1L, 2L, 3L), list(4L, 5L, 6L)), schema = c("a", "b", "c")) - result <- collect(select(df, struct("a", "c"))) + result <- collect(select(df, alias(struct("a", "c"), "d"))) expected <- data.frame(row.names = 1:2) - expected$"struct(a, c)" <- list(listToStruct(list(a = 1L, c = 3L)), - listToStruct(list(a = 4L, c = 6L))) + expected$"d" <- list(listToStruct(list(a = 1L, c = 3L)), + listToStruct(list(a = 4L, c = 6L))) expect_equal(result, expected) - result <- collect(select(df, struct(df$a, df$b))) + result <- collect(select(df, alias(struct(df$a, df$b), "d"))) expected <- data.frame(row.names = 1:2) - expected$"struct(a, b)" <- list(listToStruct(list(a = 1L, b = 2L)), - listToStruct(list(a = 4L, b = 5L))) + expected$"d" <- list(listToStruct(list(a = 1L, b = 2L)), + listToStruct(list(a = 4L, b = 5L))) expect_equal(result, expected) # Test encode(), decode() http://git-wip-us.apache.org/repos/asf/spark/blob/41491e54/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
spark git commit: [SPARK-16839][SQL] Simplify Struct creation code path
Repository: spark Updated Branches: refs/heads/master 9c8deef64 -> f151bd1af [SPARK-16839][SQL] Simplify Struct creation code path ## What changes were proposed in this pull request? Simplify struct creation, especially the aspect of `CleanupAliases` which missed some aliases when handling trees created by `CreateStruct`. This PR includes: 1. A failing test (create struct with nested aliases, some of the aliases survive `CleanupAliases`). 2. A fix that transforms `CreateStruct` into a `CreateNamedStruct` constructor, effectively eliminating `CreateStruct` from all expression trees. 3. A `NamePlaceHolder` used by `CreateStruct` when column names cannot be extracted from unresolved `NamedExpression`. 4. A new Analyzer rule that resolves `NamePlaceHolder` into a string literal once the `NamedExpression` is resolved. 5. `CleanupAliases` code was simplified as it no longer has to deal with `CreateStruct`'s top level columns. ## How was this patch tested? Running all tests-suits in package org.apache.spark.sql, especially including the analysis suite, making sure added test initially fails, after applying suggested fix rerun the entire analysis package successfully. Modified few tests that expected `CreateStruct` which is now transformed into `CreateNamedStruct`. Author: eyal farago Author: Herman van HovellAuthor: eyal farago Author: Eyal Farago Author: Hyukjin Kwon Author: eyalfa Closes #15718 from hvanhovell/SPARK-16839-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f151bd1a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f151bd1a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f151bd1a Branch: refs/heads/master Commit: f151bd1af8a05d4b6c901ebe6ac0b51a4a1a20df Parents: 9c8deef Author: eyal farago Authored: Wed Nov 2 11:12:20 2016 +0100 Committer: Herman van Hovell Committed: Wed Nov 2 11:12:20 2016 +0100 -- R/pkg/inst/tests/testthat/test_sparkSQL.R | 12 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 53 ++--- .../catalyst/analysis/FunctionRegistry.scala| 2 +- .../sql/catalyst/expressions/Projection.scala | 2 - .../expressions/complexTypeCreator.scala| 212 +++ .../spark/sql/catalyst/parser/AstBuilder.scala | 4 +- .../sql/catalyst/analysis/AnalysisSuite.scala | 38 +++- .../catalyst/expressions/ComplexTypeSuite.scala | 1 - .../scala/org/apache/spark/sql/Column.scala | 3 + .../command/AnalyzeColumnCommand.scala | 4 +- .../sql-tests/results/group-by.sql.out | 2 +- .../apache/spark/sql/hive/test/TestHive.scala | 20 +- .../resources/sqlgen/subquery_in_having_2.sql | 2 +- .../sql/catalyst/LogicalPlanToSQLSuite.scala| 12 +- 14 files changed, 169 insertions(+), 198 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f151bd1a/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 806019d..d7fe6b3 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1222,16 +1222,16 @@ test_that("column functions", { # Test struct() df <- createDataFrame(list(list(1L, 2L, 3L), list(4L, 5L, 6L)), schema = c("a", "b", "c")) - result <- collect(select(df, struct("a", "c"))) + result <- collect(select(df, alias(struct("a", "c"), "d"))) expected <- data.frame(row.names = 1:2) - expected$"struct(a, c)" <- list(listToStruct(list(a = 1L, c = 3L)), - listToStruct(list(a = 4L, c = 6L))) + expected$"d" <- list(listToStruct(list(a = 1L, c = 3L)), + listToStruct(list(a = 4L, c = 6L))) expect_equal(result, expected) - result <- collect(select(df, struct(df$a, df$b))) + result <- collect(select(df, alias(struct(df$a, df$b), "d"))) expected <- data.frame(row.names = 1:2) - expected$"struct(a, b)" <- list(listToStruct(list(a = 1L, b = 2L)), - listToStruct(list(a = 4L, b = 5L))) + expected$"d" <- list(listToStruct(list(a = 1L, b = 2L)), + listToStruct(list(a = 4L, b = 5L))) expect_equal(result, expected) # Test encode(), decode() http://git-wip-us.apache.org/repos/asf/spark/blob/f151bd1a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
spark git commit: [SPARK-18076][CORE][SQL] Fix default Locale used in DateFormat, NumberFormat to Locale.US
Repository: spark Updated Branches: refs/heads/branch-2.1 ab8da1413 -> 176afa5e8 [SPARK-18076][CORE][SQL] Fix default Locale used in DateFormat, NumberFormat to Locale.US ## What changes were proposed in this pull request? Fix `Locale.US` for all usages of `DateFormat`, `NumberFormat` ## How was this patch tested? Existing tests. Author: Sean OwenCloses #15610 from srowen/SPARK-18076. (cherry picked from commit 9c8deef64efee20a0ddc9b612f90e77c80aede60) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/176afa5e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/176afa5e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/176afa5e Branch: refs/heads/branch-2.1 Commit: 176afa5e8b207e28a16e1b22280ed05c10b7b486 Parents: ab8da14 Author: Sean Owen Authored: Wed Nov 2 09:39:15 2016 + Committer: Sean Owen Committed: Wed Nov 2 09:43:33 2016 + -- .../org/apache/spark/SparkHadoopWriter.scala| 8 +++ .../apache/spark/deploy/SparkHadoopUtil.scala | 4 ++-- .../org/apache/spark/deploy/master/Master.scala | 5 ++-- .../org/apache/spark/deploy/worker/Worker.scala | 4 ++-- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 5 ++-- .../org/apache/spark/rdd/NewHadoopRDD.scala | 4 ++-- .../org/apache/spark/rdd/PairRDDFunctions.scala | 4 ++-- .../status/api/v1/JacksonMessageWriter.scala| 4 ++-- .../spark/status/api/v1/SimpleDateParam.scala | 6 ++--- .../scala/org/apache/spark/ui/UIUtils.scala | 3 ++- .../spark/util/logging/RollingPolicy.scala | 6 ++--- .../org/apache/spark/util/UtilsSuite.scala | 2 +- .../deploy/rest/mesos/MesosRestServer.scala | 11 - .../mllib/pmml/export/PMMLModelExport.scala | 4 ++-- .../expressions/datetimeExpressions.scala | 17 +++--- .../expressions/stringExpressions.scala | 2 +- .../spark/sql/catalyst/json/JSONOptions.scala | 6 +++-- .../spark/sql/catalyst/util/DateTimeUtils.scala | 6 ++--- .../expressions/DateExpressionsSuite.scala | 24 ++-- .../sql/catalyst/util/DateTimeUtilsSuite.scala | 6 ++--- .../datasources/csv/CSVInferSchema.scala| 4 ++-- .../execution/datasources/csv/CSVOptions.scala | 5 ++-- .../spark/sql/execution/metric/SQLMetrics.scala | 2 +- .../spark/sql/execution/streaming/socket.scala | 4 ++-- .../apache/spark/sql/DateFunctionsSuite.scala | 11 + .../execution/datasources/csv/CSVSuite.scala| 9 .../datasources/csv/CSVTypeCastSuite.scala | 9 .../hive/execution/InsertIntoHiveTable.scala| 9 +++- .../spark/sql/hive/hiveWriterContainers.scala | 4 ++-- .../spark/sql/sources/SimpleTextRelation.scala | 3 ++- .../org/apache/spark/streaming/ui/UIUtils.scala | 8 --- 31 files changed, 103 insertions(+), 96 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/176afa5e/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 6550d70..7f75a39 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -20,7 +20,7 @@ package org.apache.spark import java.io.IOException import java.text.NumberFormat import java.text.SimpleDateFormat -import java.util.Date +import java.util.{Date, Locale} import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path @@ -67,12 +67,12 @@ class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable { def setup(jobid: Int, splitid: Int, attemptid: Int) { setIDs(jobid, splitid, attemptid) -HadoopRDD.addLocalConfiguration(new SimpleDateFormat("MMddHHmmss").format(now), +HadoopRDD.addLocalConfiguration(new SimpleDateFormat("MMddHHmmss", Locale.US).format(now), jobid, splitID, attemptID, conf.value) } def open() { -val numfmt = NumberFormat.getInstance() +val numfmt = NumberFormat.getInstance(Locale.US) numfmt.setMinimumIntegerDigits(5) numfmt.setGroupingUsed(false) @@ -162,7 +162,7 @@ class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable { private[spark] object SparkHadoopWriter { def createJobID(time: Date, id: Int): JobID = { -val formatter = new SimpleDateFormat("MMddHHmmss") +val formatter = new SimpleDateFormat("MMddHHmmss", Locale.US) val jobtrackerID = formatter.format(time) new JobID(jobtrackerID, id) }
spark git commit: [SPARK-18198][DOC][STREAMING] Highlight code snippets
Repository: spark Updated Branches: refs/heads/branch-2.1 3b624bedf -> ab8da1413 [SPARK-18198][DOC][STREAMING] Highlight code snippets ## What changes were proposed in this pull request? This patch uses `{% highlight lang %}...{% endhighlight %}` to highlight code snippets in the `Structured Streaming Kafka010 integration doc` and the `Spark Streaming Kafka010 integration doc`. This patch consists of two commits: - the first commit fixes only the leading spaces -- this is large - the second commit adds the highlight instructions -- this is much simpler and easier to review ## How was this patch tested? SKIP_API=1 jekyll build ## Screenshots **Before** ![snip20161101_3](https://cloud.githubusercontent.com/assets/15843379/19894258/47746524-a087-11e6-9a2a-7bff2d428d44.png) **After** ![snip20161101_1](https://cloud.githubusercontent.com/assets/15843379/19894324/8bebcd1e-a087-11e6-835b-88c4d2979cfa.png) Author: Liwei LinCloses #15715 from lw-lin/doc-highlight-code-snippet. (cherry picked from commit 98ede49496d0d7b4724085083d4f24436b92a7bf) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab8da141 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab8da141 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab8da141 Branch: refs/heads/branch-2.1 Commit: ab8da1413836591fecbc75a2515875bf3e50527f Parents: 3b624be Author: Liwei Lin Authored: Wed Nov 2 09:10:34 2016 + Committer: Sean Owen Committed: Wed Nov 2 09:41:31 2016 + -- docs/streaming-kafka-0-10-integration.md | 391 +++- docs/structured-streaming-kafka-integration.md | 156 2 files changed, 287 insertions(+), 260 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ab8da141/docs/streaming-kafka-0-10-integration.md -- diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md index c1ef396..b645d3c 100644 --- a/docs/streaming-kafka-0-10-integration.md +++ b/docs/streaming-kafka-0-10-integration.md @@ -17,69 +17,72 @@ For Scala/Java applications using SBT/Maven project definitions, link your strea - import org.apache.kafka.clients.consumer.ConsumerRecord - import org.apache.kafka.common.serialization.StringDeserializer - import org.apache.spark.streaming.kafka010._ - import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent - import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe - - val kafkaParams = Map[String, Object]( - "bootstrap.servers" -> "localhost:9092,anotherhost:9092", - "key.deserializer" -> classOf[StringDeserializer], - "value.deserializer" -> classOf[StringDeserializer], - "group.id" -> "use_a_separate_group_id_for_each_stream", - "auto.offset.reset" -> "latest", - "enable.auto.commit" -> (false: java.lang.Boolean) - ) - - val topics = Array("topicA", "topicB") - val stream = KafkaUtils.createDirectStream[String, String]( - streamingContext, - PreferConsistent, - Subscribe[String, String](topics, kafkaParams) - ) - - stream.map(record => (record.key, record.value)) - +{% highlight scala %} +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.spark.streaming.kafka010._ +import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent +import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe + +val kafkaParams = Map[String, Object]( + "bootstrap.servers" -> "localhost:9092,anotherhost:9092", + "key.deserializer" -> classOf[StringDeserializer], + "value.deserializer" -> classOf[StringDeserializer], + "group.id" -> "use_a_separate_group_id_for_each_stream", + "auto.offset.reset" -> "latest", + "enable.auto.commit" -> (false: java.lang.Boolean) +) + +val topics = Array("topicA", "topicB") +val stream = KafkaUtils.createDirectStream[String, String]( + streamingContext, + PreferConsistent, + Subscribe[String, String](topics, kafkaParams) +) + +stream.map(record => (record.key, record.value)) +{% endhighlight %} Each item in the stream is a [ConsumerRecord](http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html) - import java.util.*; - import org.apache.spark.SparkConf; - import org.apache.spark.TaskContext; - import org.apache.spark.api.java.*; - import org.apache.spark.api.java.function.*; - import org.apache.spark.streaming.api.java.*; - import
spark git commit: [SPARK-18076][CORE][SQL] Fix default Locale used in DateFormat, NumberFormat to Locale.US
Repository: spark Updated Branches: refs/heads/master 70a5db7bb -> 9c8deef64 [SPARK-18076][CORE][SQL] Fix default Locale used in DateFormat, NumberFormat to Locale.US ## What changes were proposed in this pull request? Fix `Locale.US` for all usages of `DateFormat`, `NumberFormat` ## How was this patch tested? Existing tests. Author: Sean OwenCloses #15610 from srowen/SPARK-18076. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c8deef6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c8deef6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c8deef6 Branch: refs/heads/master Commit: 9c8deef64efee20a0ddc9b612f90e77c80aede60 Parents: 70a5db7 Author: Sean Owen Authored: Wed Nov 2 09:39:15 2016 + Committer: Sean Owen Committed: Wed Nov 2 09:39:15 2016 + -- .../org/apache/spark/SparkHadoopWriter.scala| 8 +++ .../apache/spark/deploy/SparkHadoopUtil.scala | 4 ++-- .../org/apache/spark/deploy/master/Master.scala | 5 ++-- .../org/apache/spark/deploy/worker/Worker.scala | 4 ++-- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 5 ++-- .../org/apache/spark/rdd/NewHadoopRDD.scala | 4 ++-- .../org/apache/spark/rdd/PairRDDFunctions.scala | 4 ++-- .../status/api/v1/JacksonMessageWriter.scala| 4 ++-- .../spark/status/api/v1/SimpleDateParam.scala | 6 ++--- .../scala/org/apache/spark/ui/UIUtils.scala | 3 ++- .../spark/util/logging/RollingPolicy.scala | 6 ++--- .../org/apache/spark/util/UtilsSuite.scala | 2 +- .../deploy/rest/mesos/MesosRestServer.scala | 11 - .../mllib/pmml/export/PMMLModelExport.scala | 4 ++-- .../expressions/datetimeExpressions.scala | 17 +++--- .../expressions/stringExpressions.scala | 2 +- .../spark/sql/catalyst/json/JSONOptions.scala | 6 +++-- .../spark/sql/catalyst/util/DateTimeUtils.scala | 6 ++--- .../expressions/DateExpressionsSuite.scala | 24 ++-- .../sql/catalyst/util/DateTimeUtilsSuite.scala | 6 ++--- .../datasources/csv/CSVInferSchema.scala| 4 ++-- .../execution/datasources/csv/CSVOptions.scala | 5 ++-- .../spark/sql/execution/metric/SQLMetrics.scala | 2 +- .../spark/sql/execution/streaming/socket.scala | 4 ++-- .../apache/spark/sql/DateFunctionsSuite.scala | 11 + .../execution/datasources/csv/CSVSuite.scala| 9 .../datasources/csv/CSVTypeCastSuite.scala | 9 .../hive/execution/InsertIntoHiveTable.scala| 9 +++- .../spark/sql/hive/hiveWriterContainers.scala | 4 ++-- .../spark/sql/sources/SimpleTextRelation.scala | 3 ++- .../org/apache/spark/streaming/ui/UIUtils.scala | 8 --- 31 files changed, 103 insertions(+), 96 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9c8deef6/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 6550d70..7f75a39 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -20,7 +20,7 @@ package org.apache.spark import java.io.IOException import java.text.NumberFormat import java.text.SimpleDateFormat -import java.util.Date +import java.util.{Date, Locale} import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path @@ -67,12 +67,12 @@ class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable { def setup(jobid: Int, splitid: Int, attemptid: Int) { setIDs(jobid, splitid, attemptid) -HadoopRDD.addLocalConfiguration(new SimpleDateFormat("MMddHHmmss").format(now), +HadoopRDD.addLocalConfiguration(new SimpleDateFormat("MMddHHmmss", Locale.US).format(now), jobid, splitID, attemptID, conf.value) } def open() { -val numfmt = NumberFormat.getInstance() +val numfmt = NumberFormat.getInstance(Locale.US) numfmt.setMinimumIntegerDigits(5) numfmt.setGroupingUsed(false) @@ -162,7 +162,7 @@ class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable { private[spark] object SparkHadoopWriter { def createJobID(time: Date, id: Int): JobID = { -val formatter = new SimpleDateFormat("MMddHHmmss") +val formatter = new SimpleDateFormat("MMddHHmmss", Locale.US) val jobtrackerID = formatter.format(time) new JobID(jobtrackerID, id) } http://git-wip-us.apache.org/repos/asf/spark/blob/9c8deef6/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
spark git commit: [SPARK-18133][BRANCH-2.0][EXAMPLES][ML] Python ML Pipeline Exampl…
Repository: spark Updated Branches: refs/heads/branch-2.0 81f080425 -> 09178b6ee [SPARK-18133][BRANCH-2.0][EXAMPLES][ML] Python ML Pipeline Exampl… ## What changes were proposed in this pull request? [Fix] [branch-2.0] In Python 3, there is only one integer type (i.e., int), which mostly behaves like the long type in Python 2. Since Python 3 won't accept "L", so removed "L" in all examples. ## How was this patch tested? Unit tests. …e has syntax errors] Author: JagadeesanCloses #15729 from jagadeesanas2/SPARK-18133_branch2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/09178b6e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/09178b6e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/09178b6e Branch: refs/heads/branch-2.0 Commit: 09178b6eefd33011c3e90164356a5d6c3ae737bd Parents: 81f0804 Author: Jagadeesan Authored: Wed Nov 2 09:23:30 2016 + Committer: Sean Owen Committed: Wed Nov 2 09:23:30 2016 + -- examples/src/main/python/ml/cross_validator.py | 8 examples/src/main/python/ml/pipeline_example.py | 16 .../mllib/binary_classification_metrics_example.py | 2 +- .../python/mllib/multi_class_metrics_example.py | 2 +- 4 files changed, 14 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/09178b6e/examples/src/main/python/ml/cross_validator.py -- diff --git a/examples/src/main/python/ml/cross_validator.py b/examples/src/main/python/ml/cross_validator.py index a41df6c..283db5d 100644 --- a/examples/src/main/python/ml/cross_validator.py +++ b/examples/src/main/python/ml/cross_validator.py @@ -83,10 +83,10 @@ if __name__ == "__main__": # Prepare test documents, which are unlabeled. test = spark.createDataFrame([ -(4L, "spark i j k"), -(5L, "l m n"), -(6L, "mapreduce spark"), -(7L, "apache hadoop") +(4, "spark i j k"), +(5, "l m n"), +(6, "mapreduce spark"), +(7, "apache hadoop") ], ["id", "text"]) # Make predictions on test documents. cvModel uses the best model found (lrModel). http://git-wip-us.apache.org/repos/asf/spark/blob/09178b6e/examples/src/main/python/ml/pipeline_example.py -- diff --git a/examples/src/main/python/ml/pipeline_example.py b/examples/src/main/python/ml/pipeline_example.py index bd10cfd..1926cd2 100644 --- a/examples/src/main/python/ml/pipeline_example.py +++ b/examples/src/main/python/ml/pipeline_example.py @@ -35,10 +35,10 @@ if __name__ == "__main__": # $example on$ # Prepare training documents from a list of (id, text, label) tuples. training = spark.createDataFrame([ -(0L, "a b c d e spark", 1.0), -(1L, "b d", 0.0), -(2L, "spark f g h", 1.0), -(3L, "hadoop mapreduce", 0.0)], ["id", "text", "label"]) +(0, "a b c d e spark", 1.0), +(1, "b d", 0.0), +(2, "spark f g h", 1.0), +(3, "hadoop mapreduce", 0.0)], ["id", "text", "label"]) # Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. tokenizer = Tokenizer(inputCol="text", outputCol="words") @@ -51,10 +51,10 @@ if __name__ == "__main__": # Prepare test documents, which are unlabeled (id, text) tuples. test = spark.createDataFrame([ -(4L, "spark i j k"), -(5L, "l m n"), -(6L, "mapreduce spark"), -(7L, "apache hadoop")], ["id", "text"]) +(4, "spark i j k"), +(5, "l m n"), +(6, "mapreduce spark"), +(7, "apache hadoop")], ["id", "text"]) # Make predictions on test documents and print columns of interest. prediction = model.transform(test) http://git-wip-us.apache.org/repos/asf/spark/blob/09178b6e/examples/src/main/python/mllib/binary_classification_metrics_example.py -- diff --git a/examples/src/main/python/mllib/binary_classification_metrics_example.py b/examples/src/main/python/mllib/binary_classification_metrics_example.py index daf000e..91f8378 100644 --- a/examples/src/main/python/mllib/binary_classification_metrics_example.py +++ b/examples/src/main/python/mllib/binary_classification_metrics_example.py @@ -39,7 +39,7 @@ if __name__ == "__main__": .rdd.map(lambda row: LabeledPoint(row[0], row[1])) # Split data into training (60%) and test (40%) -training, test = data.randomSplit([0.6, 0.4], seed=11L) +training, test = data.randomSplit([0.6, 0.4], seed=11) training.cache() # Run training algorithm to build the
spark git commit: [SPARK-18204][WEBUI] Remove SparkUI.appUIAddress
Repository: spark Updated Branches: refs/heads/master 98ede4949 -> 70a5db7bb [SPARK-18204][WEBUI] Remove SparkUI.appUIAddress ## What changes were proposed in this pull request? Removing `appUIAddress` attribute since it is no longer in use. ## How was this patch tested? Local build Author: Jacek LaskowskiCloses #15603 from jaceklaskowski/sparkui-fixes. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70a5db7b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70a5db7b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70a5db7b Branch: refs/heads/master Commit: 70a5db7bbd192a4bc68bcfdc475ab221adf2fcdd Parents: 98ede49 Author: Jacek Laskowski Authored: Wed Nov 2 09:21:26 2016 + Committer: Sean Owen Committed: Wed Nov 2 09:21:26 2016 + -- .../cluster/StandaloneSchedulerBackend.scala| 6 +++--- .../main/scala/org/apache/spark/ui/SparkUI.scala| 13 +++-- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 8 .../org/apache/spark/ui/jobs/AllJobsPage.scala | 4 ++-- .../scala/org/apache/spark/ui/UISeleniumSuite.scala | 16 .../test/scala/org/apache/spark/ui/UISuite.scala| 13 ++--- .../mesos/MesosCoarseGrainedSchedulerBackend.scala | 2 +- .../mesos/MesosFineGrainedSchedulerBackend.scala| 2 +- .../apache/spark/streaming/UISeleniumSuite.scala| 12 ++-- .../spark/deploy/yarn/ApplicationMaster.scala | 2 +- .../cluster/YarnClientSchedulerBackend.scala| 2 +- 11 files changed, 36 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/70a5db7b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 04d40e2..368cd30 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -93,7 +93,7 @@ private[spark] class StandaloneSchedulerBackend( val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) -val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") +val webUrl = sc.ui.map(_.webUrl).getOrElse("") val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) // If we're using dynamic allocation, set our initial executor limit to 0 for now. // ExecutorAllocationManager will send the real initial limit to the Master later. @@ -103,8 +103,8 @@ private[spark] class StandaloneSchedulerBackend( } else { None } -val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) +val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, + webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf) client.start() launcherBackend.setState(SparkAppHandle.State.SUBMITTED) http://git-wip-us.apache.org/repos/asf/spark/blob/70a5db7b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index f631a04..b828532 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -82,7 +82,7 @@ private[spark] class SparkUI private ( initialize() def getSparkUser: String = { - environmentListener.systemProperties.toMap.get("user.name").getOrElse("") +environmentListener.systemProperties.toMap.getOrElse("user.name", "") } def getAppName: String = appName @@ -94,16 +94,9 @@ private[spark] class SparkUI private ( /** Stop the server behind this web interface. Only valid after bind(). */ override def stop() { super.stop() -logInfo("Stopped Spark web UI at %s".format(appUIAddress)) +logInfo(s"Stopped Spark web UI at $webUrl") } - /** - * Return the application UI host:port. This does not include the scheme (http://). - */ - private[spark] def appUIHostPort =
spark git commit: [SPARK-18198][DOC][STREAMING] Highlight code snippets
Repository: spark Updated Branches: refs/heads/master bcbe0 -> 98ede4949 [SPARK-18198][DOC][STREAMING] Highlight code snippets ## What changes were proposed in this pull request? This patch uses `{% highlight lang %}...{% endhighlight %}` to highlight code snippets in the `Structured Streaming Kafka010 integration doc` and the `Spark Streaming Kafka010 integration doc`. This patch consists of two commits: - the first commit fixes only the leading spaces -- this is large - the second commit adds the highlight instructions -- this is much simpler and easier to review ## How was this patch tested? SKIP_API=1 jekyll build ## Screenshots **Before** ![snip20161101_3](https://cloud.githubusercontent.com/assets/15843379/19894258/47746524-a087-11e6-9a2a-7bff2d428d44.png) **After** ![snip20161101_1](https://cloud.githubusercontent.com/assets/15843379/19894324/8bebcd1e-a087-11e6-835b-88c4d2979cfa.png) Author: Liwei LinCloses #15715 from lw-lin/doc-highlight-code-snippet. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98ede494 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98ede494 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98ede494 Branch: refs/heads/master Commit: 98ede49496d0d7b4724085083d4f24436b92a7bf Parents: bcbe444 Author: Liwei Lin Authored: Wed Nov 2 09:10:34 2016 + Committer: Sean Owen Committed: Wed Nov 2 09:10:34 2016 + -- docs/streaming-kafka-0-10-integration.md | 391 +++- docs/structured-streaming-kafka-integration.md | 156 2 files changed, 287 insertions(+), 260 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/98ede494/docs/streaming-kafka-0-10-integration.md -- diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md index c1ef396..b645d3c 100644 --- a/docs/streaming-kafka-0-10-integration.md +++ b/docs/streaming-kafka-0-10-integration.md @@ -17,69 +17,72 @@ For Scala/Java applications using SBT/Maven project definitions, link your strea - import org.apache.kafka.clients.consumer.ConsumerRecord - import org.apache.kafka.common.serialization.StringDeserializer - import org.apache.spark.streaming.kafka010._ - import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent - import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe - - val kafkaParams = Map[String, Object]( - "bootstrap.servers" -> "localhost:9092,anotherhost:9092", - "key.deserializer" -> classOf[StringDeserializer], - "value.deserializer" -> classOf[StringDeserializer], - "group.id" -> "use_a_separate_group_id_for_each_stream", - "auto.offset.reset" -> "latest", - "enable.auto.commit" -> (false: java.lang.Boolean) - ) - - val topics = Array("topicA", "topicB") - val stream = KafkaUtils.createDirectStream[String, String]( - streamingContext, - PreferConsistent, - Subscribe[String, String](topics, kafkaParams) - ) - - stream.map(record => (record.key, record.value)) - +{% highlight scala %} +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.spark.streaming.kafka010._ +import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent +import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe + +val kafkaParams = Map[String, Object]( + "bootstrap.servers" -> "localhost:9092,anotherhost:9092", + "key.deserializer" -> classOf[StringDeserializer], + "value.deserializer" -> classOf[StringDeserializer], + "group.id" -> "use_a_separate_group_id_for_each_stream", + "auto.offset.reset" -> "latest", + "enable.auto.commit" -> (false: java.lang.Boolean) +) + +val topics = Array("topicA", "topicB") +val stream = KafkaUtils.createDirectStream[String, String]( + streamingContext, + PreferConsistent, + Subscribe[String, String](topics, kafkaParams) +) + +stream.map(record => (record.key, record.value)) +{% endhighlight %} Each item in the stream is a [ConsumerRecord](http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html) - import java.util.*; - import org.apache.spark.SparkConf; - import org.apache.spark.TaskContext; - import org.apache.spark.api.java.*; - import org.apache.spark.api.java.function.*; - import org.apache.spark.streaming.api.java.*; - import org.apache.spark.streaming.kafka010.*; - import org.apache.kafka.clients.consumer.ConsumerRecord; - import
spark git commit: [MINOR] Use <= for clarity in Pi examples' Monte Carlo process
Repository: spark Updated Branches: refs/heads/master 2dc048081 -> bcbe0 [MINOR] Use <= for clarity in Pi examples' Monte Carlo process ## What changes were proposed in this pull request? If my understanding is correct we should be rather looking at closed disk than the opened one. ## How was this patch tested? Run simple comparison, of the mean squared error of approaches with closed and opened disk. https://gist.github.com/mrydzy/1cf0e5c316ef9d6fbd91426b91f1969f The closed one performed slightly better, but the tested sample wasn't too big, so I rely mostly on the algorithm understanding. Author: Maria RydzyCloses #15687 from mrydzy/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bcbe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bcbe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bcbe Branch: refs/heads/master Commit: bcbe0e6c871e217f06d2a4696fd41f1d2606 Parents: 2dc0480 Author: Maria Rydzy Authored: Wed Nov 2 09:09:16 2016 + Committer: Sean Owen Committed: Wed Nov 2 09:09:16 2016 + -- examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java | 2 +- examples/src/main/python/pi.py| 2 +- examples/src/main/scala/org/apache/spark/examples/LocalPi.scala | 2 +- examples/src/main/scala/org/apache/spark/examples/SparkPi.scala | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bcbe/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java index 7df145e..89855e8 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java @@ -54,7 +54,7 @@ public final class JavaSparkPi { public Integer call(Integer integer) { double x = Math.random() * 2 - 1; double y = Math.random() * 2 - 1; -return (x * x + y * y < 1) ? 1 : 0; +return (x * x + y * y <= 1) ? 1 : 0; } }).reduce(new Function2 () { @Override http://git-wip-us.apache.org/repos/asf/spark/blob/bcbe/examples/src/main/python/pi.py -- diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py index e3f0c4a..37029b7 100755 --- a/examples/src/main/python/pi.py +++ b/examples/src/main/python/pi.py @@ -38,7 +38,7 @@ if __name__ == "__main__": def f(_): x = random() * 2 - 1 y = random() * 2 - 1 -return 1 if x ** 2 + y ** 2 < 1 else 0 +return 1 if x ** 2 + y ** 2 <= 1 else 0 count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) print("Pi is roughly %f" % (4.0 * count / n)) http://git-wip-us.apache.org/repos/asf/spark/blob/bcbe/examples/src/main/scala/org/apache/spark/examples/LocalPi.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalPi.scala b/examples/src/main/scala/org/apache/spark/examples/LocalPi.scala index 720d92f..121b768 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalPi.scala @@ -26,7 +26,7 @@ object LocalPi { for (i <- 1 to 10) { val x = random * 2 - 1 val y = random * 2 - 1 - if (x*x + y*y < 1) count += 1 + if (x*x + y*y <= 1) count += 1 } println("Pi is roughly " + 4 * count / 10.0) } http://git-wip-us.apache.org/repos/asf/spark/blob/bcbe/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala index 272c1a4..a5cacf1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala @@ -34,7 +34,7 @@ object SparkPi { val count = spark.sparkContext.parallelize(1 until n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 - if (x*x + y*y < 1) 1 else 0 + if (x*x + y*y <= 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / (n - 1)) spark.stop() - To unsubscribe,
spark git commit: [SPARK-17532] Add lock debugging info to thread dumps.
Repository: spark Updated Branches: refs/heads/branch-2.1 4c4bf87ac -> 3b624bedf [SPARK-17532] Add lock debugging info to thread dumps. ## What changes were proposed in this pull request? This adds information to the web UI thread dump page about the JVM locks held by threads and the locks that threads are blocked waiting to acquire. This should help find cases where lock contention is causing Spark applications to run slowly. ## How was this patch tested? Tested by applying this patch and viewing the change in the web UI. ![thread-lock-info](https://cloud.githubusercontent.com/assets/87915/18493057/6e5da870-79c3-11e6-8c20-f54c18a37544.png) Additions: - A "Thread Locking" column with the locks held by the thread or that are blocking the thread - Links from the a blocked thread to the thread holding the lock - Stack frames show where threads are inside `synchronized` blocks, "holding Monitor(...)" Author: Ryan BlueCloses #15088 from rdblue/SPARK-17532-add-thread-lock-info. (cherry picked from commit 2dc048081668665f85623839d5f663b402e42555) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b624bed Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b624bed Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b624bed Branch: refs/heads/branch-2.1 Commit: 3b624bedf0f0ecd5dcfcc262a3ca8b4e33662533 Parents: 4c4bf87 Author: Ryan Blue Authored: Wed Nov 2 00:08:30 2016 -0700 Committer: Reynold Xin Committed: Wed Nov 2 00:08:37 2016 -0700 -- .../org/apache/spark/ui/static/table.js | 3 +- .../spark/ui/exec/ExecutorThreadDumpPage.scala | 12 +++ .../apache/spark/util/ThreadStackTrace.scala| 6 +++- .../scala/org/apache/spark/util/Utils.scala | 34 +--- 4 files changed, 49 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3b624bed/core/src/main/resources/org/apache/spark/ui/static/table.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/table.js b/core/src/main/resources/org/apache/spark/ui/static/table.js index 14b06bf..0315ebf 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/table.js +++ b/core/src/main/resources/org/apache/spark/ui/static/table.js @@ -36,7 +36,7 @@ function toggleThreadStackTrace(threadId, forceAdd) { if (stackTrace.length == 0) { var stackTraceText = $('#' + threadId + "_td_stacktrace").html() var threadCell = $("#thread_" + threadId + "_tr") -threadCell.after("" + +threadCell.after("" + stackTraceText + "") } else { if (!forceAdd) { @@ -73,6 +73,7 @@ function onMouseOverAndOut(threadId) { $("#" + threadId + "_td_id").toggleClass("threaddump-td-mouseover"); $("#" + threadId + "_td_name").toggleClass("threaddump-td-mouseover"); $("#" + threadId + "_td_state").toggleClass("threaddump-td-mouseover"); +$("#" + threadId + "_td_locking").toggleClass("threaddump-td-mouseover"); } function onSearchStringChange() { http://git-wip-us.apache.org/repos/asf/spark/blob/3b624bed/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala index a0ef80d..c6a0744 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala @@ -48,6 +48,16 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage } }.map { thread => val threadId = thread.threadId +val blockedBy = thread.blockedByThreadId match { + case Some(blockedByThreadId) => + + Blocked by + Thread {thread.blockedByThreadId} {thread.blockedByLock} + + case None => Text("") +} +val heldLocks = thread.holdingLocks.mkString(", ") + {threadId} {thread.threadName} {thread.threadState} + {blockedBy}{heldLocks} {thread.stackTrace} } @@ -86,6 +97,7 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage Thread ID Thread Name Thread State + Thread Locks {dumpRows} http://git-wip-us.apache.org/repos/asf/spark/blob/3b624bed/core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala
spark git commit: [SPARK-17532] Add lock debugging info to thread dumps.
Repository: spark Updated Branches: refs/heads/master 85c5424d4 -> 2dc048081 [SPARK-17532] Add lock debugging info to thread dumps. ## What changes were proposed in this pull request? This adds information to the web UI thread dump page about the JVM locks held by threads and the locks that threads are blocked waiting to acquire. This should help find cases where lock contention is causing Spark applications to run slowly. ## How was this patch tested? Tested by applying this patch and viewing the change in the web UI. ![thread-lock-info](https://cloud.githubusercontent.com/assets/87915/18493057/6e5da870-79c3-11e6-8c20-f54c18a37544.png) Additions: - A "Thread Locking" column with the locks held by the thread or that are blocking the thread - Links from the a blocked thread to the thread holding the lock - Stack frames show where threads are inside `synchronized` blocks, "holding Monitor(...)" Author: Ryan BlueCloses #15088 from rdblue/SPARK-17532-add-thread-lock-info. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2dc04808 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2dc04808 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2dc04808 Branch: refs/heads/master Commit: 2dc048081668665f85623839d5f663b402e42555 Parents: 85c5424 Author: Ryan Blue Authored: Wed Nov 2 00:08:30 2016 -0700 Committer: Reynold Xin Committed: Wed Nov 2 00:08:30 2016 -0700 -- .../org/apache/spark/ui/static/table.js | 3 +- .../spark/ui/exec/ExecutorThreadDumpPage.scala | 12 +++ .../apache/spark/util/ThreadStackTrace.scala| 6 +++- .../scala/org/apache/spark/util/Utils.scala | 34 +--- 4 files changed, 49 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2dc04808/core/src/main/resources/org/apache/spark/ui/static/table.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/table.js b/core/src/main/resources/org/apache/spark/ui/static/table.js index 14b06bf..0315ebf 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/table.js +++ b/core/src/main/resources/org/apache/spark/ui/static/table.js @@ -36,7 +36,7 @@ function toggleThreadStackTrace(threadId, forceAdd) { if (stackTrace.length == 0) { var stackTraceText = $('#' + threadId + "_td_stacktrace").html() var threadCell = $("#thread_" + threadId + "_tr") -threadCell.after("" + +threadCell.after("" + stackTraceText + "") } else { if (!forceAdd) { @@ -73,6 +73,7 @@ function onMouseOverAndOut(threadId) { $("#" + threadId + "_td_id").toggleClass("threaddump-td-mouseover"); $("#" + threadId + "_td_name").toggleClass("threaddump-td-mouseover"); $("#" + threadId + "_td_state").toggleClass("threaddump-td-mouseover"); +$("#" + threadId + "_td_locking").toggleClass("threaddump-td-mouseover"); } function onSearchStringChange() { http://git-wip-us.apache.org/repos/asf/spark/blob/2dc04808/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala index a0ef80d..c6a0744 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala @@ -48,6 +48,16 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage } }.map { thread => val threadId = thread.threadId +val blockedBy = thread.blockedByThreadId match { + case Some(blockedByThreadId) => + + Blocked by + Thread {thread.blockedByThreadId} {thread.blockedByLock} + + case None => Text("") +} +val heldLocks = thread.holdingLocks.mkString(", ") + {threadId} {thread.threadName} {thread.threadState} + {blockedBy}{heldLocks} {thread.stackTrace} } @@ -86,6 +97,7 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage Thread ID Thread Name Thread State + Thread Locks {dumpRows} http://git-wip-us.apache.org/repos/asf/spark/blob/2dc04808/core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala
spark git commit: [SPARK-18144][SQL] logging StreamingQueryListener$QueryStartedEvent
Repository: spark Updated Branches: refs/heads/branch-2.1 85dd07374 -> 4c4bf87ac [SPARK-18144][SQL] logging StreamingQueryListener$QueryStartedEvent ## What changes were proposed in this pull request? The PR fixes the bug that the QueryStartedEvent is not logged the postToAll() in the original code is actually calling StreamingQueryListenerBus.postToAll() which has no listener at allwe shall post by sparkListenerBus.postToAll(s) and this.postToAll() to trigger local listeners as well as the listeners registered in LiveListenerBus zsxwing ## How was this patch tested? The following snapshot shows that QueryStartedEvent has been logged correctly ![image](https://cloud.githubusercontent.com/assets/678008/19821553/007a7d28-9d2d-11e6-9f13-49851559cdaa.png) Author: CodingCatCloses #15675 from CodingCat/SPARK-18144. (cherry picked from commit 85c5424d466f4a5765c825e0e2ab30da97611285) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c4bf87a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c4bf87a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c4bf87a Branch: refs/heads/branch-2.1 Commit: 4c4bf87acf2516a72b59f4e760413f80640dca1e Parents: 85dd0737 Author: CodingCat Authored: Tue Nov 1 23:39:53 2016 -0700 Committer: Shixiong Zhu Committed: Tue Nov 1 23:40:00 2016 -0700 -- .../execution/streaming/StreamingQueryListenerBus.scala | 10 +- .../apache/spark/sql/streaming/StreamingQuerySuite.scala | 7 ++- 2 files changed, 15 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4c4bf87a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index fc2190d..22e4c63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -41,6 +41,8 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) def post(event: StreamingQueryListener.Event) { event match { case s: QueryStartedEvent => +sparkListenerBus.post(s) +// post to local listeners to trigger callbacks postToAll(s) case _ => sparkListenerBus.post(event) @@ -50,7 +52,13 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { case e: StreamingQueryListener.Event => -postToAll(e) +// SPARK-18144: we broadcast QueryStartedEvent to all listeners attached to this bus +// synchronously and the ones attached to LiveListenerBus asynchronously. Therefore, +// we need to ignore QueryStartedEvent if this method is called within SparkListenerBus +// thread +if (!LiveListenerBus.withinListenerThread.value || !e.isInstanceOf[QueryStartedEvent]) { + postToAll(e) +} case _ => } } http://git-wip-us.apache.org/repos/asf/spark/blob/4c4bf87a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 464c443..31b7fe0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -290,7 +290,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { // A StreamingQueryListener that gets the query status after the first completed trigger val listener = new StreamingQueryListener { @volatile var firstStatus: StreamingQueryStatus = null - override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { } + @volatile var queryStartedEvent = 0 + override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { +queryStartedEvent += 1 + } override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { if (firstStatus == null) firstStatus = queryProgress.queryStatus } @@ -303,6 +306,8 @@ class StreamingQuerySuite extends StreamTest with
spark git commit: [SPARK-18144][SQL] logging StreamingQueryListener$QueryStartedEvent
Repository: spark Updated Branches: refs/heads/branch-2.0 d401a74d4 -> 81f080425 [SPARK-18144][SQL] logging StreamingQueryListener$QueryStartedEvent ## What changes were proposed in this pull request? The PR fixes the bug that the QueryStartedEvent is not logged the postToAll() in the original code is actually calling StreamingQueryListenerBus.postToAll() which has no listener at allwe shall post by sparkListenerBus.postToAll(s) and this.postToAll() to trigger local listeners as well as the listeners registered in LiveListenerBus zsxwing ## How was this patch tested? The following snapshot shows that QueryStartedEvent has been logged correctly ![image](https://cloud.githubusercontent.com/assets/678008/19821553/007a7d28-9d2d-11e6-9f13-49851559cdaa.png) Author: CodingCatCloses #15675 from CodingCat/SPARK-18144. (cherry picked from commit 85c5424d466f4a5765c825e0e2ab30da97611285) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81f08042 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81f08042 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81f08042 Branch: refs/heads/branch-2.0 Commit: 81f0804252bbfdc280e013e5f7b016759e66406e Parents: d401a74 Author: CodingCat Authored: Tue Nov 1 23:39:53 2016 -0700 Committer: Shixiong Zhu Committed: Tue Nov 1 23:40:12 2016 -0700 -- .../execution/streaming/StreamingQueryListenerBus.scala | 10 +- .../apache/spark/sql/streaming/StreamingQuerySuite.scala | 7 ++- 2 files changed, 15 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/81f08042/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index fc2190d..22e4c63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -41,6 +41,8 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) def post(event: StreamingQueryListener.Event) { event match { case s: QueryStartedEvent => +sparkListenerBus.post(s) +// post to local listeners to trigger callbacks postToAll(s) case _ => sparkListenerBus.post(event) @@ -50,7 +52,13 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { case e: StreamingQueryListener.Event => -postToAll(e) +// SPARK-18144: we broadcast QueryStartedEvent to all listeners attached to this bus +// synchronously and the ones attached to LiveListenerBus asynchronously. Therefore, +// we need to ignore QueryStartedEvent if this method is called within SparkListenerBus +// thread +if (!LiveListenerBus.withinListenerThread.value || !e.isInstanceOf[QueryStartedEvent]) { + postToAll(e) +} case _ => } } http://git-wip-us.apache.org/repos/asf/spark/blob/81f08042/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 464c443..31b7fe0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -290,7 +290,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { // A StreamingQueryListener that gets the query status after the first completed trigger val listener = new StreamingQueryListener { @volatile var firstStatus: StreamingQueryStatus = null - override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { } + @volatile var queryStartedEvent = 0 + override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { +queryStartedEvent += 1 + } override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { if (firstStatus == null) firstStatus = queryProgress.queryStatus } @@ -303,6 +306,8 @@ class StreamingQuerySuite extends StreamTest with
spark git commit: [SPARK-18144][SQL] logging StreamingQueryListener$QueryStartedEvent
Repository: spark Updated Branches: refs/heads/master a36653c5b -> 85c5424d4 [SPARK-18144][SQL] logging StreamingQueryListener$QueryStartedEvent ## What changes were proposed in this pull request? The PR fixes the bug that the QueryStartedEvent is not logged the postToAll() in the original code is actually calling StreamingQueryListenerBus.postToAll() which has no listener at allwe shall post by sparkListenerBus.postToAll(s) and this.postToAll() to trigger local listeners as well as the listeners registered in LiveListenerBus zsxwing ## How was this patch tested? The following snapshot shows that QueryStartedEvent has been logged correctly ![image](https://cloud.githubusercontent.com/assets/678008/19821553/007a7d28-9d2d-11e6-9f13-49851559cdaa.png) Author: CodingCatCloses #15675 from CodingCat/SPARK-18144. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/85c5424d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/85c5424d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/85c5424d Branch: refs/heads/master Commit: 85c5424d466f4a5765c825e0e2ab30da97611285 Parents: a36653c Author: CodingCat Authored: Tue Nov 1 23:39:53 2016 -0700 Committer: Shixiong Zhu Committed: Tue Nov 1 23:39:53 2016 -0700 -- .../execution/streaming/StreamingQueryListenerBus.scala | 10 +- .../apache/spark/sql/streaming/StreamingQuerySuite.scala | 7 ++- 2 files changed, 15 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/85c5424d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index fc2190d..22e4c63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -41,6 +41,8 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) def post(event: StreamingQueryListener.Event) { event match { case s: QueryStartedEvent => +sparkListenerBus.post(s) +// post to local listeners to trigger callbacks postToAll(s) case _ => sparkListenerBus.post(event) @@ -50,7 +52,13 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { case e: StreamingQueryListener.Event => -postToAll(e) +// SPARK-18144: we broadcast QueryStartedEvent to all listeners attached to this bus +// synchronously and the ones attached to LiveListenerBus asynchronously. Therefore, +// we need to ignore QueryStartedEvent if this method is called within SparkListenerBus +// thread +if (!LiveListenerBus.withinListenerThread.value || !e.isInstanceOf[QueryStartedEvent]) { + postToAll(e) +} case _ => } } http://git-wip-us.apache.org/repos/asf/spark/blob/85c5424d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 464c443..31b7fe0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -290,7 +290,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { // A StreamingQueryListener that gets the query status after the first completed trigger val listener = new StreamingQueryListener { @volatile var firstStatus: StreamingQueryStatus = null - override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { } + @volatile var queryStartedEvent = 0 + override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { +queryStartedEvent += 1 + } override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { if (firstStatus == null) firstStatus = queryProgress.queryStatus } @@ -303,6 +306,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { q.processAllAvailable() eventually(timeout(streamingTimeout)) {
spark git commit: [SPARK-18192] Support all file formats in structured streaming
Repository: spark Updated Branches: refs/heads/branch-2.1 e6509c245 -> 85dd07374 [SPARK-18192] Support all file formats in structured streaming ## What changes were proposed in this pull request? This patch adds support for all file formats in structured streaming sinks. This is actually a very small change thanks to all the previous refactoring done using the new internal commit protocol API. ## How was this patch tested? Updated FileStreamSinkSuite to add test cases for json, text, and parquet. Author: Reynold XinCloses #15711 from rxin/SPARK-18192. (cherry picked from commit a36653c5b7b2719f8bfddf4ddfc6e1b828ac9af1) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/85dd0737 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/85dd0737 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/85dd0737 Branch: refs/heads/branch-2.1 Commit: 85dd073743946383438aabb9f1281e6075f25cc5 Parents: e6509c2 Author: Reynold Xin Authored: Tue Nov 1 23:37:03 2016 -0700 Committer: Reynold Xin Committed: Tue Nov 1 23:37:11 2016 -0700 -- .../sql/execution/datasources/DataSource.scala | 8 +-- .../sql/streaming/FileStreamSinkSuite.scala | 62 +--- 2 files changed, 32 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/85dd0737/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d980e6a..3f956c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat @@ -37,7 +36,6 @@ import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{CalendarIntervalType, StructType} @@ -292,7 +290,7 @@ case class DataSource( case s: StreamSinkProvider => s.createSink(sparkSession.sqlContext, options, partitionColumns, outputMode) - case parquet: parquet.ParquetFileFormat => + case fileFormat: FileFormat => val caseInsensitiveOptions = new CaseInsensitiveMap(options) val path = caseInsensitiveOptions.getOrElse("path", { throw new IllegalArgumentException("'path' is not specified") @@ -301,7 +299,7 @@ case class DataSource( throw new IllegalArgumentException( s"Data source $className does not support $outputMode output mode") } -new FileStreamSink(sparkSession, path, parquet, partitionColumns, options) +new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, options) case _ => throw new UnsupportedOperationException( @@ -516,7 +514,7 @@ case class DataSource( val plan = data.logicalPlan plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse { throw new AnalysisException( - s"Unable to resolve ${name} given [${plan.output.map(_.name).mkString(", ")}]") + s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]") }.asInstanceOf[Attribute] } // For partitioned relation r, r.schema's column ordering can be different from the column http://git-wip-us.apache.org/repos/asf/spark/blob/85dd0737/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 902cf05..0f140f9 100644 ---
spark git commit: [SPARK-18192] Support all file formats in structured streaming
Repository: spark Updated Branches: refs/heads/master abefe2ec4 -> a36653c5b [SPARK-18192] Support all file formats in structured streaming ## What changes were proposed in this pull request? This patch adds support for all file formats in structured streaming sinks. This is actually a very small change thanks to all the previous refactoring done using the new internal commit protocol API. ## How was this patch tested? Updated FileStreamSinkSuite to add test cases for json, text, and parquet. Author: Reynold XinCloses #15711 from rxin/SPARK-18192. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a36653c5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a36653c5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a36653c5 Branch: refs/heads/master Commit: a36653c5b7b2719f8bfddf4ddfc6e1b828ac9af1 Parents: abefe2e Author: Reynold Xin Authored: Tue Nov 1 23:37:03 2016 -0700 Committer: Reynold Xin Committed: Tue Nov 1 23:37:03 2016 -0700 -- .../sql/execution/datasources/DataSource.scala | 8 +-- .../sql/streaming/FileStreamSinkSuite.scala | 62 +--- 2 files changed, 32 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a36653c5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d980e6a..3f956c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat @@ -37,7 +36,6 @@ import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{CalendarIntervalType, StructType} @@ -292,7 +290,7 @@ case class DataSource( case s: StreamSinkProvider => s.createSink(sparkSession.sqlContext, options, partitionColumns, outputMode) - case parquet: parquet.ParquetFileFormat => + case fileFormat: FileFormat => val caseInsensitiveOptions = new CaseInsensitiveMap(options) val path = caseInsensitiveOptions.getOrElse("path", { throw new IllegalArgumentException("'path' is not specified") @@ -301,7 +299,7 @@ case class DataSource( throw new IllegalArgumentException( s"Data source $className does not support $outputMode output mode") } -new FileStreamSink(sparkSession, path, parquet, partitionColumns, options) +new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, options) case _ => throw new UnsupportedOperationException( @@ -516,7 +514,7 @@ case class DataSource( val plan = data.logicalPlan plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse { throw new AnalysisException( - s"Unable to resolve ${name} given [${plan.output.map(_.name).mkString(", ")}]") + s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]") }.asInstanceOf[Attribute] } // For partitioned relation r, r.schema's column ordering can be different from the column http://git-wip-us.apache.org/repos/asf/spark/blob/a36653c5/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 902cf05..0f140f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++
spark git commit: [SPARK-18183][SPARK-18184] Fix INSERT [INTO|OVERWRITE] TABLE ... PARTITION for Datasource tables
Repository: spark Updated Branches: refs/heads/branch-2.1 39d2fdb51 -> e6509c245 [SPARK-18183][SPARK-18184] Fix INSERT [INTO|OVERWRITE] TABLE ... PARTITION for Datasource tables There are a couple issues with the current 2.1 behavior when inserting into Datasource tables with partitions managed by Hive. (1) OVERWRITE TABLE ... PARTITION will actually overwrite the entire table instead of just the specified partition. (2) INSERT|OVERWRITE does not work with partitions that have custom locations. This PR fixes both of these issues for Datasource tables managed by Hive. The behavior for legacy tables or when `manageFilesourcePartitions = false` is unchanged. There is one other issue in that INSERT OVERWRITE with dynamic partitions will overwrite the entire table instead of just the updated partitions, but this behavior is pretty complicated to implement for Datasource tables. We should address that in a future release. Unit tests. Author: Eric LiangCloses #15705 from ericl/sc-4942. (cherry picked from commit abefe2ec428dc24a4112c623fb6fbe4b2ca60a2b) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e6509c24 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e6509c24 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e6509c24 Branch: refs/heads/branch-2.1 Commit: e6509c2459e7ece3c3c6bcd143b8cc71f8f4d5c8 Parents: 39d2fdb Author: Eric Liang Authored: Wed Nov 2 14:15:10 2016 +0800 Committer: Reynold Xin Committed: Tue Nov 1 23:23:55 2016 -0700 -- .../apache/spark/sql/catalyst/dsl/package.scala | 2 +- .../spark/sql/catalyst/parser/AstBuilder.scala | 9 +++- .../plans/logical/basicLogicalOperators.scala | 19 ++- .../sql/catalyst/parser/PlanParserSuite.scala | 15 -- .../org/apache/spark/sql/DataFrameWriter.scala | 4 +- .../datasources/CatalogFileIndex.scala | 5 +- .../datasources/DataSourceStrategy.scala| 30 +-- .../InsertIntoDataSourceCommand.scala | 6 +-- .../apache/spark/sql/hive/HiveStrategies.scala | 3 +- .../CreateHiveTableAsSelectCommand.scala| 5 +- .../PartitionProviderCompatibilitySuite.scala | 52 11 files changed, 129 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e6509c24/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 66e52ca..e901683 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -367,7 +367,7 @@ package object dsl { def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan = InsertIntoTable( analysis.UnresolvedRelation(TableIdentifier(tableName)), - Map.empty, logicalPlan, overwrite, false) + Map.empty, logicalPlan, OverwriteOptions(overwrite), false) def as(alias: String): LogicalPlan = logicalPlan match { case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias)) http://git-wip-us.apache.org/repos/asf/spark/blob/e6509c24/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 38e9bb6..ac1577b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -177,12 +177,19 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " + "partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx) } +val overwrite = ctx.OVERWRITE != null +val overwritePartition = + if (overwrite && partitionKeys.nonEmpty && dynamicPartitionKeys.isEmpty) { +Some(partitionKeys.map(t => (t._1, t._2.get))) + } else { +None + } InsertIntoTable( UnresolvedRelation(tableIdent, None), partitionKeys, query, - ctx.OVERWRITE != null, + OverwriteOptions(overwrite, overwritePartition), ctx.EXISTS != null) }
spark git commit: [SPARK-18183][SPARK-18184] Fix INSERT [INTO|OVERWRITE] TABLE ... PARTITION for Datasource tables
Repository: spark Updated Branches: refs/heads/master 620da3b48 -> abefe2ec4 [SPARK-18183][SPARK-18184] Fix INSERT [INTO|OVERWRITE] TABLE ... PARTITION for Datasource tables ## What changes were proposed in this pull request? There are a couple issues with the current 2.1 behavior when inserting into Datasource tables with partitions managed by Hive. (1) OVERWRITE TABLE ... PARTITION will actually overwrite the entire table instead of just the specified partition. (2) INSERT|OVERWRITE does not work with partitions that have custom locations. This PR fixes both of these issues for Datasource tables managed by Hive. The behavior for legacy tables or when `manageFilesourcePartitions = false` is unchanged. There is one other issue in that INSERT OVERWRITE with dynamic partitions will overwrite the entire table instead of just the updated partitions, but this behavior is pretty complicated to implement for Datasource tables. We should address that in a future release. ## How was this patch tested? Unit tests. Author: Eric LiangCloses #15705 from ericl/sc-4942. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abefe2ec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abefe2ec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abefe2ec Branch: refs/heads/master Commit: abefe2ec428dc24a4112c623fb6fbe4b2ca60a2b Parents: 620da3b Author: Eric Liang Authored: Wed Nov 2 14:15:10 2016 +0800 Committer: Wenchen Fan Committed: Wed Nov 2 14:15:10 2016 +0800 -- .../apache/spark/sql/catalyst/dsl/package.scala | 2 +- .../spark/sql/catalyst/parser/AstBuilder.scala | 9 +++- .../plans/logical/basicLogicalOperators.scala | 19 ++- .../sql/catalyst/parser/PlanParserSuite.scala | 15 -- .../org/apache/spark/sql/DataFrameWriter.scala | 4 +- .../datasources/CatalogFileIndex.scala | 5 +- .../datasources/DataSourceStrategy.scala| 30 +-- .../InsertIntoDataSourceCommand.scala | 6 +-- .../apache/spark/sql/hive/HiveStrategies.scala | 3 +- .../CreateHiveTableAsSelectCommand.scala| 5 +- .../PartitionProviderCompatibilitySuite.scala | 52 11 files changed, 129 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/abefe2ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 66e52ca..e901683 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -367,7 +367,7 @@ package object dsl { def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan = InsertIntoTable( analysis.UnresolvedRelation(TableIdentifier(tableName)), - Map.empty, logicalPlan, overwrite, false) + Map.empty, logicalPlan, OverwriteOptions(overwrite), false) def as(alias: String): LogicalPlan = logicalPlan match { case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias)) http://git-wip-us.apache.org/repos/asf/spark/blob/abefe2ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 38e9bb6..ac1577b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -177,12 +177,19 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " + "partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx) } +val overwrite = ctx.OVERWRITE != null +val overwritePartition = + if (overwrite && partitionKeys.nonEmpty && dynamicPartitionKeys.isEmpty) { +Some(partitionKeys.map(t => (t._1, t._2.get))) + } else { +None + } InsertIntoTable( UnresolvedRelation(tableIdent, None), partitionKeys, query, - ctx.OVERWRITE != null, + OverwriteOptions(overwrite, overwritePartition), ctx.EXISTS != null) }
spark git commit: [SPARK-17475][STREAMING] Delete CRC files if the filesystem doesn't use checksum files
Repository: spark Updated Branches: refs/heads/branch-2.1 1bbf9ff63 -> 39d2fdb51 [SPARK-17475][STREAMING] Delete CRC files if the filesystem doesn't use checksum files ## What changes were proposed in this pull request? When the metadata logs for various parts of Structured Streaming are stored on non-HDFS filesystems such as NFS or ext4, the HDFSMetadataLog class leaves hidden HDFS-style checksum (CRC) files in the log directory, one file per batch. This PR modifies HDFSMetadataLog so that it detects the use of a filesystem that doesn't use CRC files and removes the CRC files. ## How was this patch tested? Modified an existing test case in HDFSMetadataLogSuite to check whether HDFSMetadataLog correctly removes CRC files on the local POSIX filesystem. Ran the entire regression suite. Author: frreissCloses #15027 from frreiss/fred-17475. (cherry picked from commit 620da3b4828b3580c7ed7339b2a07938e6be1bb1) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39d2fdb5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39d2fdb5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39d2fdb5 Branch: refs/heads/branch-2.1 Commit: 39d2fdb51233ed9b1aaf3adaa3267853f5e58c0f Parents: 1bbf9ff Author: frreiss Authored: Tue Nov 1 23:00:17 2016 -0700 Committer: Reynold Xin Committed: Tue Nov 1 23:00:28 2016 -0700 -- .../apache/spark/sql/execution/streaming/HDFSMetadataLog.scala | 5 + .../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala | 6 ++ 2 files changed, 11 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/39d2fdb5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index c7235320..9a0f87c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -148,6 +148,11 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) // It will fail if there is an existing file (someone has committed the batch) logDebug(s"Attempting to write log #${batchIdToPath(batchId)}") fileManager.rename(tempPath, batchIdToPath(batchId)) + + // SPARK-17475: HDFSMetadataLog should not leak CRC files + // If the underlying filesystem didn't rename the CRC file, delete it. + val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc") + if (fileManager.exists(crcPath)) fileManager.delete(crcPath) return } catch { case e: IOException if isFileAlreadyExistsException(e) => http://git-wip-us.apache.org/repos/asf/spark/blob/39d2fdb5/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 9c1d26d..d03e08d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -119,6 +119,12 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { assert(metadataLog.get(1).isEmpty) assert(metadataLog.get(2).isDefined) assert(metadataLog.getLatest().get._1 == 2) + + // There should be exactly one file, called "2", in the metadata directory. + // This check also tests for regressions of SPARK-17475 + val allFiles = new File(metadataLog.metadataPath.toString).listFiles().toSeq + assert(allFiles.size == 1) + assert(allFiles(0).getName() == "2") } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17475][STREAMING] Delete CRC files if the filesystem doesn't use checksum files
Repository: spark Updated Branches: refs/heads/master 1bbf9ff63 -> 620da3b48 [SPARK-17475][STREAMING] Delete CRC files if the filesystem doesn't use checksum files ## What changes were proposed in this pull request? When the metadata logs for various parts of Structured Streaming are stored on non-HDFS filesystems such as NFS or ext4, the HDFSMetadataLog class leaves hidden HDFS-style checksum (CRC) files in the log directory, one file per batch. This PR modifies HDFSMetadataLog so that it detects the use of a filesystem that doesn't use CRC files and removes the CRC files. ## How was this patch tested? Modified an existing test case in HDFSMetadataLogSuite to check whether HDFSMetadataLog correctly removes CRC files on the local POSIX filesystem. Ran the entire regression suite. Author: frreissCloses #15027 from frreiss/fred-17475. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/620da3b4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/620da3b4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/620da3b4 Branch: refs/heads/master Commit: 620da3b4828b3580c7ed7339b2a07938e6be1bb1 Parents: 1bbf9ff Author: frreiss Authored: Tue Nov 1 23:00:17 2016 -0700 Committer: Reynold Xin Committed: Tue Nov 1 23:00:17 2016 -0700 -- .../apache/spark/sql/execution/streaming/HDFSMetadataLog.scala | 5 + .../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala | 6 ++ 2 files changed, 11 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/620da3b4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index c7235320..9a0f87c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -148,6 +148,11 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) // It will fail if there is an existing file (someone has committed the batch) logDebug(s"Attempting to write log #${batchIdToPath(batchId)}") fileManager.rename(tempPath, batchIdToPath(batchId)) + + // SPARK-17475: HDFSMetadataLog should not leak CRC files + // If the underlying filesystem didn't rename the CRC file, delete it. + val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc") + if (fileManager.exists(crcPath)) fileManager.delete(crcPath) return } catch { case e: IOException if isFileAlreadyExistsException(e) => http://git-wip-us.apache.org/repos/asf/spark/blob/620da3b4/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 9c1d26d..d03e08d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -119,6 +119,12 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { assert(metadataLog.get(1).isEmpty) assert(metadataLog.get(2).isDefined) assert(metadataLog.getLatest().get._1 == 2) + + // There should be exactly one file, called "2", in the metadata directory. + // This check also tests for regressions of SPARK-17475 + val allFiles = new File(metadataLog.metadataPath.toString).listFiles().toSeq + assert(allFiles.size == 1) + assert(allFiles(0).getName() == "2") } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org