svn commit: r26387 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_18_00_01-cce4694-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Apr 18 07:17:58 2018 New Revision: 26387 Log: Apache Spark 2.4.0-SNAPSHOT-2018_04_18_00_01-cce4694 docs [This commit notification would consist of 1457 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23926][SQL] Extending reverse function to support ArrayType arguments
Repository: spark Updated Branches: refs/heads/master cce469435 -> f81fa478f [SPARK-23926][SQL] Extending reverse function to support ArrayType arguments ## What changes were proposed in this pull request? This PR extends `reverse` functions to be able to operate over array columns and covers: - Introduction of `Reverse` expression that represents logic for reversing arrays and also strings - Removal of `StringReverse` expression - A wrapper for PySpark ## How was this patch tested? New tests added into: - CollectionExpressionsSuite - DataFrameFunctionsSuite ## Codegen examples ### Primitive type ``` val df = Seq( Seq(1, 3, 4, 2), null ).toDF("i") df.filter($"i".isNotNull || $"i".isNull).select(reverse($"i")).debugCodegen ``` Result: ``` /* 032 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 033 */ ArrayData inputadapter_value = inputadapter_isNull ? /* 034 */ null : (inputadapter_row.getArray(0)); /* 035 */ /* 036 */ boolean filter_value = true; /* 037 */ /* 038 */ if (!(!inputadapter_isNull)) { /* 039 */ filter_value = inputadapter_isNull; /* 040 */ } /* 041 */ if (!filter_value) continue; /* 042 */ /* 043 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1); /* 044 */ /* 045 */ boolean project_isNull = inputadapter_isNull; /* 046 */ ArrayData project_value = null; /* 047 */ /* 048 */ if (!inputadapter_isNull) { /* 049 */ final int project_length = inputadapter_value.numElements(); /* 050 */ project_value = inputadapter_value.copy(); /* 051 */ for(int k = 0; k < project_length / 2; k++) { /* 052 */ int l = project_length - k - 1; /* 053 */ boolean isNullAtK = project_value.isNullAt(k); /* 054 */ boolean isNullAtL = project_value.isNullAt(l); /* 055 */ if(!isNullAtK) { /* 056 */ int el = project_value.getInt(k); /* 057 */ if(!isNullAtL) { /* 058 */ project_value.setInt(k, project_value.getInt(l)); /* 059 */ } else { /* 060 */ project_value.setNullAt(k); /* 061 */ } /* 062 */ project_value.setInt(l, el); /* 063 */ } else if (!isNullAtL) { /* 064 */ project_value.setInt(k, project_value.getInt(l)); /* 065 */ project_value.setNullAt(l); /* 066 */ } /* 067 */ } /* 068 */ /* 069 */ } ``` ### Non-primitive type ``` val df = Seq( Seq("a", "c", "d", "b"), null ).toDF("s") df.filter($"s".isNotNull || $"s".isNull).select(reverse($"s")).debugCodegen ``` Result: ``` /* 032 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 033 */ ArrayData inputadapter_value = inputadapter_isNull ? /* 034 */ null : (inputadapter_row.getArray(0)); /* 035 */ /* 036 */ boolean filter_value = true; /* 037 */ /* 038 */ if (!(!inputadapter_isNull)) { /* 039 */ filter_value = inputadapter_isNull; /* 040 */ } /* 041 */ if (!filter_value) continue; /* 042 */ /* 043 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1); /* 044 */ /* 045 */ boolean project_isNull = inputadapter_isNull; /* 046 */ ArrayData project_value = null; /* 047 */ /* 048 */ if (!inputadapter_isNull) { /* 049 */ final int project_length = inputadapter_value.numElements(); /* 050 */ project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(new Object[project_length]); /* 051 */ for(int k = 0; k < project_length; k++) { /* 052 */ int l = project_length - k - 1; /* 053 */ project_value.update(k, inputadapter_value.getUTF8String(l)); /* 054 */ } /* 055 */ /* 056 */ } ``` Author: mn-mikke Closes #21034 from mn-mikke/feature/array-api-reverse-to-master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f81fa478 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f81fa478 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f81fa478 Branch: refs/heads/master Commit: f81fa478ff990146e2a8e463ac252271448d96f5 Parents: cce4694 Author: mn-mikke Authored: Wed Apr 18 18:41:55 2018 +0900 Committer: Takuya UESHIN Committed: Wed Apr 18 18:41:55 2018 +0900 -- python/pyspark/sql/functions.py | 20 - .../catalyst/analysis/FunctionRegistry.scala| 2 +- .../expressions/collectionOperations.scala | 88 ++ .../expressions/stringExpressions.scala | 20 - .../CollectionExpressionsSuite.scala| 44 + .../expressions/StringExpressionsSuite.scala| 6 +- .../scala/org/apache/spark/sql/functions.scala | 15 ++--
svn commit: r26391 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_18_04_03-f81fa47-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Apr 18 11:22:58 2018 New Revision: 26391 Log: Apache Spark 2.4.0-SNAPSHOT-2018_04_18_04_03-f81fa47 docs [This commit notification would consist of 1457 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a wrong result by codegen.
Repository: spark Updated Branches: refs/heads/master f81fa478f -> f09a9e941 [SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a wrong result by codegen. ## What changes were proposed in this pull request? `EqualNullSafe` for `FloatType` and `DoubleType` might generate a wrong result by codegen. ```scala scala> val df = Seq((Some(-1.0d), None), (None, Some(-1.0d))).toDF() df: org.apache.spark.sql.DataFrame = [_1: double, _2: double] scala> df.show() +++ | _1| _2| +++ |-1.0|null| |null|-1.0| +++ scala> df.filter("_1 <=> _2").show() +++ | _1| _2| +++ |-1.0|null| |null|-1.0| +++ ``` The result should be empty but the result remains two rows. ## How was this patch tested? Added a test. Author: Takuya UESHIN Closes #21094 from ueshin/issues/SPARK-24007/equalnullsafe. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f09a9e94 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f09a9e94 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f09a9e94 Branch: refs/heads/master Commit: f09a9e9418c1697d198de18f340b1288f5eb025c Parents: f81fa47 Author: Takuya UESHIN Authored: Wed Apr 18 08:22:05 2018 -0700 Committer: gatorsmile Committed: Wed Apr 18 08:22:05 2018 -0700 -- .../sql/catalyst/expressions/codegen/CodeGenerator.scala | 6 -- .../spark/sql/catalyst/expressions/PredicateSuite.scala | 7 +++ 2 files changed, 11 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f09a9e94/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index f6b6775..cf0a91f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -582,8 +582,10 @@ class CodegenContext { */ def genEqual(dataType: DataType, c1: String, c2: String): String = dataType match { case BinaryType => s"java.util.Arrays.equals($c1, $c2)" -case FloatType => s"(java.lang.Float.isNaN($c1) && java.lang.Float.isNaN($c2)) || $c1 == $c2" -case DoubleType => s"(java.lang.Double.isNaN($c1) && java.lang.Double.isNaN($c2)) || $c1 == $c2" +case FloatType => + s"((java.lang.Float.isNaN($c1) && java.lang.Float.isNaN($c2)) || $c1 == $c2)" +case DoubleType => + s"((java.lang.Double.isNaN($c1) && java.lang.Double.isNaN($c2)) || $c1 == $c2)" case dt: DataType if isPrimitiveType(dt) => s"$c1 == $c2" case dt: DataType if dt.isInstanceOf[AtomicType] => s"$c1.equals($c2)" case array: ArrayType => genComp(array, c1, c2) + " == 0" http://git-wip-us.apache.org/repos/asf/spark/blob/f09a9e94/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala index 8a8f8e1..1bfd180 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala @@ -442,4 +442,11 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper { InSet(Literal(1), Set(1, 2, 3, 4)).genCode(ctx) assert(ctx.inlinedMutableStates.isEmpty) } + + test("SPARK-24007: EqualNullSafe for FloatType and DoubleType might generate a wrong result") { +checkEvaluation(EqualNullSafe(Literal(null, FloatType), Literal(-1.0f)), false) +checkEvaluation(EqualNullSafe(Literal(-1.0f), Literal(null, FloatType)), false) +checkEvaluation(EqualNullSafe(Literal(null, DoubleType), Literal(-1.0d)), false) +checkEvaluation(EqualNullSafe(Literal(-1.0d), Literal(null, DoubleType)), false) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a wrong result by codegen.
Repository: spark Updated Branches: refs/heads/branch-2.3 6b99d5bc3 -> a1c56b669 [SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a wrong result by codegen. ## What changes were proposed in this pull request? `EqualNullSafe` for `FloatType` and `DoubleType` might generate a wrong result by codegen. ```scala scala> val df = Seq((Some(-1.0d), None), (None, Some(-1.0d))).toDF() df: org.apache.spark.sql.DataFrame = [_1: double, _2: double] scala> df.show() +++ | _1| _2| +++ |-1.0|null| |null|-1.0| +++ scala> df.filter("_1 <=> _2").show() +++ | _1| _2| +++ |-1.0|null| |null|-1.0| +++ ``` The result should be empty but the result remains two rows. ## How was this patch tested? Added a test. Author: Takuya UESHIN Closes #21094 from ueshin/issues/SPARK-24007/equalnullsafe. (cherry picked from commit f09a9e9418c1697d198de18f340b1288f5eb025c) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1c56b66 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1c56b66 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1c56b66 Branch: refs/heads/branch-2.3 Commit: a1c56b66970a683e458e3f44fd6788110e869093 Parents: 6b99d5b Author: Takuya UESHIN Authored: Wed Apr 18 08:22:05 2018 -0700 Committer: gatorsmile Committed: Wed Apr 18 08:22:16 2018 -0700 -- .../sql/catalyst/expressions/codegen/CodeGenerator.scala | 6 -- .../spark/sql/catalyst/expressions/PredicateSuite.scala | 7 +++ 2 files changed, 11 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a1c56b66/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 504e851..9cf5839 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -764,8 +764,10 @@ class CodegenContext { */ def genEqual(dataType: DataType, c1: String, c2: String): String = dataType match { case BinaryType => s"java.util.Arrays.equals($c1, $c2)" -case FloatType => s"(java.lang.Float.isNaN($c1) && java.lang.Float.isNaN($c2)) || $c1 == $c2" -case DoubleType => s"(java.lang.Double.isNaN($c1) && java.lang.Double.isNaN($c2)) || $c1 == $c2" +case FloatType => + s"((java.lang.Float.isNaN($c1) && java.lang.Float.isNaN($c2)) || $c1 == $c2)" +case DoubleType => + s"((java.lang.Double.isNaN($c1) && java.lang.Double.isNaN($c2)) || $c1 == $c2)" case dt: DataType if isPrimitiveType(dt) => s"$c1 == $c2" case dt: DataType if dt.isInstanceOf[AtomicType] => s"$c1.equals($c2)" case array: ArrayType => genComp(array, c1, c2) + " == 0" http://git-wip-us.apache.org/repos/asf/spark/blob/a1c56b66/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala index 8a8f8e1..1bfd180 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala @@ -442,4 +442,11 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper { InSet(Literal(1), Set(1, 2, 3, 4)).genCode(ctx) assert(ctx.inlinedMutableStates.isEmpty) } + + test("SPARK-24007: EqualNullSafe for FloatType and DoubleType might generate a wrong result") { +checkEvaluation(EqualNullSafe(Literal(null, FloatType), Literal(-1.0f)), false) +checkEvaluation(EqualNullSafe(Literal(-1.0f), Literal(null, FloatType)), false) +checkEvaluation(EqualNullSafe(Literal(null, DoubleType), Literal(-1.0d)), false) +checkEvaluation(EqualNullSafe(Literal(-1.0d), Literal(null, DoubleType)), false) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a wrong result by codegen.
Repository: spark Updated Branches: refs/heads/branch-2.2 e957c4e88 -> a902323fb [SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a wrong result by codegen. `EqualNullSafe` for `FloatType` and `DoubleType` might generate a wrong result by codegen. ```scala scala> val df = Seq((Some(-1.0d), None), (None, Some(-1.0d))).toDF() df: org.apache.spark.sql.DataFrame = [_1: double, _2: double] scala> df.show() +++ | _1| _2| +++ |-1.0|null| |null|-1.0| +++ scala> df.filter("_1 <=> _2").show() +++ | _1| _2| +++ |-1.0|null| |null|-1.0| +++ ``` The result should be empty but the result remains two rows. Added a test. Author: Takuya UESHIN Closes #21094 from ueshin/issues/SPARK-24007/equalnullsafe. (cherry picked from commit f09a9e9418c1697d198de18f340b1288f5eb025c) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a902323f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a902323f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a902323f Branch: refs/heads/branch-2.2 Commit: a902323fbf7be27a7ca747105eedd61b1d57b9d4 Parents: e957c4e Author: Takuya UESHIN Authored: Wed Apr 18 08:22:05 2018 -0700 Committer: gatorsmile Committed: Wed Apr 18 08:23:46 2018 -0700 -- .../sql/catalyst/expressions/codegen/CodeGenerator.scala | 6 -- .../spark/sql/catalyst/expressions/PredicateSuite.scala | 7 +++ 2 files changed, 11 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a902323f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 3964471..9e5eaf6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -482,8 +482,10 @@ class CodegenContext { */ def genEqual(dataType: DataType, c1: String, c2: String): String = dataType match { case BinaryType => s"java.util.Arrays.equals($c1, $c2)" -case FloatType => s"(java.lang.Float.isNaN($c1) && java.lang.Float.isNaN($c2)) || $c1 == $c2" -case DoubleType => s"(java.lang.Double.isNaN($c1) && java.lang.Double.isNaN($c2)) || $c1 == $c2" +case FloatType => + s"((java.lang.Float.isNaN($c1) && java.lang.Float.isNaN($c2)) || $c1 == $c2)" +case DoubleType => + s"((java.lang.Double.isNaN($c1) && java.lang.Double.isNaN($c2)) || $c1 == $c2)" case dt: DataType if isPrimitiveType(dt) => s"$c1 == $c2" case dt: DataType if dt.isInstanceOf[AtomicType] => s"$c1.equals($c2)" case array: ArrayType => genComp(array, c1, c2) + " == 0" http://git-wip-us.apache.org/repos/asf/spark/blob/a902323f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala index bf3b184..15ae624 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala @@ -340,4 +340,11 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper { val infinity = Literal(Double.PositiveInfinity) checkEvaluation(EqualTo(infinity, infinity), true) } + + test("SPARK-24007: EqualNullSafe for FloatType and DoubleType might generate a wrong result") { +checkEvaluation(EqualNullSafe(Literal(null, FloatType), Literal(-1.0f)), false) +checkEvaluation(EqualNullSafe(Literal(-1.0f), Literal(null, FloatType)), false) +checkEvaluation(EqualNullSafe(Literal(null, DoubleType), Literal(-1.0d)), false) +checkEvaluation(EqualNullSafe(Literal(-1.0d), Literal(null, DoubleType)), false) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23963][SQL] Properly handle large number of columns in query on text-based Hive table
Repository: spark Updated Branches: refs/heads/branch-2.3 a1c56b669 -> 5bcb7bdcc [SPARK-23963][SQL] Properly handle large number of columns in query on text-based Hive table ## What changes were proposed in this pull request? TableReader would get disproportionately slower as the number of columns in the query increased. I fixed the way TableReader was looking up metadata for each column in the row. Previously, it had been looking up this data in linked lists, accessing each linked list by an index (column number). Now it looks up this data in arrays, where indexing by column number works better. ## How was this patch tested? Manual testing All sbt unit tests python sql tests Author: Bruce Robbins Closes #21043 from bersprockets/tabreadfix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5bcb7bdc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5bcb7bdc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5bcb7bdc Branch: refs/heads/branch-2.3 Commit: 5bcb7bdccf967ff5ad3d8c76f4ad8c9c4031e7c2 Parents: a1c56b6 Author: Bruce Robbins Authored: Fri Apr 13 14:05:04 2018 -0700 Committer: gatorsmile Committed: Wed Apr 18 09:48:49 2018 -0700 -- .../src/main/scala/org/apache/spark/sql/hive/TableReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5bcb7bdc/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index cc8907a..b5444a4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -381,7 +381,7 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging { val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) => soi.getStructFieldRef(attr.name) -> ordinal -}.unzip +}.toArray.unzip /** * Builds specific unwrappers ahead of time according to object inspector - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23963][SQL] Properly handle large number of columns in query on text-based Hive table
Repository: spark Updated Branches: refs/heads/branch-2.2 a902323fb -> 041aec4e1 [SPARK-23963][SQL] Properly handle large number of columns in query on text-based Hive table ## What changes were proposed in this pull request? TableReader would get disproportionately slower as the number of columns in the query increased. I fixed the way TableReader was looking up metadata for each column in the row. Previously, it had been looking up this data in linked lists, accessing each linked list by an index (column number). Now it looks up this data in arrays, where indexing by column number works better. ## How was this patch tested? Manual testing All sbt unit tests python sql tests Author: Bruce Robbins Closes #21043 from bersprockets/tabreadfix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/041aec4e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/041aec4e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/041aec4e Branch: refs/heads/branch-2.2 Commit: 041aec4e1bfb4f3c2d4db6761486f3523102c75e Parents: a902323 Author: Bruce Robbins Authored: Fri Apr 13 14:05:04 2018 -0700 Committer: gatorsmile Committed: Wed Apr 18 09:50:13 2018 -0700 -- .../src/main/scala/org/apache/spark/sql/hive/TableReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/041aec4e/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index a0e379f..11795ff 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -381,7 +381,7 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging { val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) => soi.getStructFieldRef(attr.name) -> ordinal -}.unzip +}.toArray.unzip /** * Builds specific unwrappers ahead of time according to object inspector - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r26400 - in /dev/spark/2.3.1-SNAPSHOT-2018_04_18_10_01-5bcb7bd-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Apr 18 17:16:04 2018 New Revision: 26400 Log: Apache Spark 2.3.1-SNAPSHOT-2018_04_18_10_01-5bcb7bd docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r26402 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_18_12_02-f09a9e9-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Apr 18 19:16:09 2018 New Revision: 26402 Log: Apache Spark 2.4.0-SNAPSHOT-2018_04_18_12_02-f09a9e9 docs [This commit notification would consist of 1457 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23875][SQL][FOLLOWUP] Add IndexedSeq wrapper for ArrayData
Repository: spark Updated Branches: refs/heads/master f09a9e941 -> a9066478f [SPARK-23875][SQL][FOLLOWUP] Add IndexedSeq wrapper for ArrayData ## What changes were proposed in this pull request? Use specified accessor in `ArrayData.foreach` and `toArray`. ## How was this patch tested? Existing tests. Author: Liang-Chi Hsieh Closes #21099 from viirya/SPARK-23875-followup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9066478 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9066478 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9066478 Branch: refs/heads/master Commit: a9066478f6d98c3ae634c3bb9b09ee20bd60e111 Parents: f09a9e9 Author: Liang-Chi Hsieh Authored: Thu Apr 19 00:05:47 2018 +0200 Committer: Herman van Hovell Committed: Thu Apr 19 00:05:47 2018 +0200 -- .../scala/org/apache/spark/sql/catalyst/util/ArrayData.scala | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a9066478/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala index 2cf59d5..104b428 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala @@ -141,28 +141,29 @@ abstract class ArrayData extends SpecializedGetters with Serializable { def toArray[T: ClassTag](elementType: DataType): Array[T] = { val size = numElements() +val accessor = InternalRow.getAccessor(elementType) val values = new Array[T](size) var i = 0 while (i < size) { if (isNullAt(i)) { values(i) = null.asInstanceOf[T] } else { -values(i) = get(i, elementType).asInstanceOf[T] +values(i) = accessor(this, i).asInstanceOf[T] } i += 1 } values } - // todo: specialize this. def foreach(elementType: DataType, f: (Int, Any) => Unit): Unit = { val size = numElements() +val accessor = InternalRow.getAccessor(elementType) var i = 0 while (i < size) { if (isNullAt(i)) { f(i, null) } else { -f(i, get(i, elementType)) +f(i, accessor(this, i)) } i += 1 } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r26405 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_18_16_01-a906647-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Apr 18 23:16:22 2018 New Revision: 26405 Log: Apache Spark 2.4.0-SNAPSHOT-2018_04_18_16_01-a906647 docs [This commit notification would consist of 1457 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky
Repository: spark Updated Branches: refs/heads/master a9066478f -> 0c94e48bc [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky ## What changes were proposed in this pull request? DataFrameRangeSuite.test("Cancelling stage in a query with Range.") stays sometimes in an infinite loop and times out the build. There were multiple issues with the test: 1. The first valid stageId is zero when the test started alone and not in a suite and the following code waits until timeout: ``` eventually(timeout(10.seconds), interval(1.millis)) { assert(DataFrameRangeSuite.stageToKill > 0) } ``` 2. The `DataFrameRangeSuite.stageToKill` was overwritten by the task's thread after the reset which ended up in canceling the same stage 2 times. This caused the infinite wait. This PR solves this mentioned flakyness by removing the shared `DataFrameRangeSuite.stageToKill` and using `wait` and `CountDownLatch` for synhronization. ## How was this patch tested? Existing unit test. Author: Gabor Somogyi Closes #20888 from gaborgsomogyi/SPARK-23775. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c94e48b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c94e48b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c94e48b Branch: refs/heads/master Commit: 0c94e48bc50717e1627c0d2acd5382d9adc73c97 Parents: a906647 Author: Gabor Somogyi Authored: Wed Apr 18 16:37:41 2018 -0700 Committer: Marcelo Vanzin Committed: Wed Apr 18 16:37:41 2018 -0700 -- .../apache/spark/sql/DataFrameRangeSuite.scala | 78 +++- 1 file changed, 45 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0c94e48b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala index 57a930d..a0fd740 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala @@ -17,14 +17,16 @@ package org.apache.spark.sql +import java.util.concurrent.{CountDownLatch, TimeUnit} + import scala.concurrent.duration._ import scala.math.abs import scala.util.Random import org.scalatest.concurrent.Eventually -import org.apache.spark.{SparkException, TaskContext} -import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -152,39 +154,53 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall } test("Cancelling stage in a query with Range.") { -val listener = new SparkListener { - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { -eventually(timeout(10.seconds), interval(1.millis)) { - assert(DataFrameRangeSuite.stageToKill > 0) +// Save and restore the value because SparkContext is shared +val savedInterruptOnCancel = sparkContext + .getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) + +try { + sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true") + + for (codegen <- Seq(true, false)) { +// This countdown latch used to make sure with all the stages cancelStage called in listener +val latch = new CountDownLatch(2) + +val listener = new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +sparkContext.cancelStage(taskStart.stageId) +latch.countDown() + } } -sparkContext.cancelStage(DataFrameRangeSuite.stageToKill) - } -} -sparkContext.addSparkListener(listener) -for (codegen <- Seq(true, false)) { - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { -DataFrameRangeSuite.stageToKill = -1 -val ex = intercept[SparkException] { - spark.range(0, 1000L, 1, 1).map { x => -DataFrameRangeSuite.stageToKill = TaskContext.get().stageId() -x - }.toDF("id").agg(sum("id")).collect() +sparkContext.addSparkListener(listener) +withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { + val ex = intercept[SparkException] { +sparkContext.range(0, 1L, numSlices = 10).mapPartitions { x => + x.synchronized { +x.wait() +
spark git commit: [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky
Repository: spark Updated Branches: refs/heads/branch-2.3 5bcb7bdcc -> 130641102 [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky ## What changes were proposed in this pull request? DataFrameRangeSuite.test("Cancelling stage in a query with Range.") stays sometimes in an infinite loop and times out the build. There were multiple issues with the test: 1. The first valid stageId is zero when the test started alone and not in a suite and the following code waits until timeout: ``` eventually(timeout(10.seconds), interval(1.millis)) { assert(DataFrameRangeSuite.stageToKill > 0) } ``` 2. The `DataFrameRangeSuite.stageToKill` was overwritten by the task's thread after the reset which ended up in canceling the same stage 2 times. This caused the infinite wait. This PR solves this mentioned flakyness by removing the shared `DataFrameRangeSuite.stageToKill` and using `wait` and `CountDownLatch` for synhronization. ## How was this patch tested? Existing unit test. Author: Gabor Somogyi Closes #20888 from gaborgsomogyi/SPARK-23775. (cherry picked from commit 0c94e48bc50717e1627c0d2acd5382d9adc73c97) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13064110 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13064110 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13064110 Branch: refs/heads/branch-2.3 Commit: 130641102ceecf2a795d7f0dc6412c7e56eb03a8 Parents: 5bcb7bd Author: Gabor Somogyi Authored: Wed Apr 18 16:37:41 2018 -0700 Committer: Marcelo Vanzin Committed: Wed Apr 18 16:37:52 2018 -0700 -- .../apache/spark/sql/DataFrameRangeSuite.scala | 78 +++- 1 file changed, 45 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/13064110/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala index 57a930d..a0fd740 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala @@ -17,14 +17,16 @@ package org.apache.spark.sql +import java.util.concurrent.{CountDownLatch, TimeUnit} + import scala.concurrent.duration._ import scala.math.abs import scala.util.Random import org.scalatest.concurrent.Eventually -import org.apache.spark.{SparkException, TaskContext} -import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -152,39 +154,53 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall } test("Cancelling stage in a query with Range.") { -val listener = new SparkListener { - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { -eventually(timeout(10.seconds), interval(1.millis)) { - assert(DataFrameRangeSuite.stageToKill > 0) +// Save and restore the value because SparkContext is shared +val savedInterruptOnCancel = sparkContext + .getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) + +try { + sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true") + + for (codegen <- Seq(true, false)) { +// This countdown latch used to make sure with all the stages cancelStage called in listener +val latch = new CountDownLatch(2) + +val listener = new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +sparkContext.cancelStage(taskStart.stageId) +latch.countDown() + } } -sparkContext.cancelStage(DataFrameRangeSuite.stageToKill) - } -} -sparkContext.addSparkListener(listener) -for (codegen <- Seq(true, false)) { - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { -DataFrameRangeSuite.stageToKill = -1 -val ex = intercept[SparkException] { - spark.range(0, 1000L, 1, 1).map { x => -DataFrameRangeSuite.stageToKill = TaskContext.get().stageId() -x - }.toDF("id").agg(sum("id")).collect() +sparkContext.addSparkListener(listener) +withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { + val ex = intercept[SparkException] { +sparkContext.range(0, 1L, n
spark git commit: [SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingListener
Repository: spark Updated Branches: refs/heads/branch-2.3 130641102 -> 32bec6ca3 [SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingListener ## What changes were proposed in this pull request? The `StreamingListener` in PySpark side seems to be lack of `onStreamingStarted` method. This patch adds it and a test for it. This patch also includes a trivial doc improvement for `createDirectStream`. Original PR is #21057. ## How was this patch tested? Added test. Author: Liang-Chi Hsieh Closes #21098 from viirya/SPARK-24014. (cherry picked from commit 8bb0df2c65355dfdcd28e362ff661c6c7ebc99c0) Signed-off-by: jerryshao Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32bec6ca Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32bec6ca Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32bec6ca Branch: refs/heads/branch-2.3 Commit: 32bec6ca3d9e47587c84f928d4166475fe29f596 Parents: 1306411 Author: Liang-Chi Hsieh Authored: Thu Apr 19 10:00:57 2018 +0800 Committer: jerryshao Committed: Thu Apr 19 10:01:13 2018 +0800 -- python/pyspark/streaming/kafka.py| 3 ++- python/pyspark/streaming/listener.py | 6 ++ python/pyspark/streaming/tests.py| 7 +++ 3 files changed, 15 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/32bec6ca/python/pyspark/streaming/kafka.py -- diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index fdb9308..ed2e0e7 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -104,7 +104,8 @@ class KafkaUtils(object): :param topics: list of topic_name to consume. :param kafkaParams: Additional params for Kafka. :param fromOffsets: Per-topic/partition Kafka offsets defining the (inclusive) starting -point of the stream. +point of the stream (a dictionary mapping `TopicAndPartition` to +integers). :param keyDecoder: A function used to decode key (default is utf8_decoder). :param valueDecoder: A function used to decode value (default is utf8_decoder). :param messageHandler: A function used to convert KafkaMessageAndMetadata. You can assess http://git-wip-us.apache.org/repos/asf/spark/blob/32bec6ca/python/pyspark/streaming/listener.py -- diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py index b830797..d4ecc21 100644 --- a/python/pyspark/streaming/listener.py +++ b/python/pyspark/streaming/listener.py @@ -23,6 +23,12 @@ class StreamingListener(object): def __init__(self): pass +def onStreamingStarted(self, streamingStarted): +""" +Called when the streaming has been started. +""" +pass + def onReceiverStarted(self, receiverStarted): """ Called when a receiver has been started http://git-wip-us.apache.org/repos/asf/spark/blob/32bec6ca/python/pyspark/streaming/tests.py -- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index ca28c9b..1ec418a 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -507,6 +507,10 @@ class StreamingListenerTests(PySparkStreamingTestCase): self.batchInfosCompleted = [] self.batchInfosStarted = [] self.batchInfosSubmitted = [] +self.streamingStartedTime = [] + +def onStreamingStarted(self, streamingStarted): +self.streamingStartedTime.append(streamingStarted.time) def onBatchSubmitted(self, batchSubmitted): self.batchInfosSubmitted.append(batchSubmitted.batchInfo()) @@ -530,9 +534,12 @@ class StreamingListenerTests(PySparkStreamingTestCase): batchInfosSubmitted = batch_collector.batchInfosSubmitted batchInfosStarted = batch_collector.batchInfosStarted batchInfosCompleted = batch_collector.batchInfosCompleted +streamingStartedTime = batch_collector.streamingStartedTime self.wait_for(batchInfosCompleted, 4) +self.assertEqual(len(streamingStartedTime), 1) + self.assertGreaterEqual(len(batchInfosSubmitted), 4) for info in batchInfosSubmitted: self.assertGreaterEqual(info.batchTime().milliseconds(), 0) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingListener
Repository: spark Updated Branches: refs/heads/master 0c94e48bc -> 8bb0df2c6 [SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingListener ## What changes were proposed in this pull request? The `StreamingListener` in PySpark side seems to be lack of `onStreamingStarted` method. This patch adds it and a test for it. This patch also includes a trivial doc improvement for `createDirectStream`. Original PR is #21057. ## How was this patch tested? Added test. Author: Liang-Chi Hsieh Closes #21098 from viirya/SPARK-24014. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8bb0df2c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8bb0df2c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8bb0df2c Branch: refs/heads/master Commit: 8bb0df2c65355dfdcd28e362ff661c6c7ebc99c0 Parents: 0c94e48 Author: Liang-Chi Hsieh Authored: Thu Apr 19 10:00:57 2018 +0800 Committer: jerryshao Committed: Thu Apr 19 10:00:57 2018 +0800 -- python/pyspark/streaming/kafka.py| 3 ++- python/pyspark/streaming/listener.py | 6 ++ python/pyspark/streaming/tests.py| 7 +++ 3 files changed, 15 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8bb0df2c/python/pyspark/streaming/kafka.py -- diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index fdb9308..ed2e0e7 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -104,7 +104,8 @@ class KafkaUtils(object): :param topics: list of topic_name to consume. :param kafkaParams: Additional params for Kafka. :param fromOffsets: Per-topic/partition Kafka offsets defining the (inclusive) starting -point of the stream. +point of the stream (a dictionary mapping `TopicAndPartition` to +integers). :param keyDecoder: A function used to decode key (default is utf8_decoder). :param valueDecoder: A function used to decode value (default is utf8_decoder). :param messageHandler: A function used to convert KafkaMessageAndMetadata. You can assess http://git-wip-us.apache.org/repos/asf/spark/blob/8bb0df2c/python/pyspark/streaming/listener.py -- diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py index b830797..d4ecc21 100644 --- a/python/pyspark/streaming/listener.py +++ b/python/pyspark/streaming/listener.py @@ -23,6 +23,12 @@ class StreamingListener(object): def __init__(self): pass +def onStreamingStarted(self, streamingStarted): +""" +Called when the streaming has been started. +""" +pass + def onReceiverStarted(self, receiverStarted): """ Called when a receiver has been started http://git-wip-us.apache.org/repos/asf/spark/blob/8bb0df2c/python/pyspark/streaming/tests.py -- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 7dde7c0..1039409 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -507,6 +507,10 @@ class StreamingListenerTests(PySparkStreamingTestCase): self.batchInfosCompleted = [] self.batchInfosStarted = [] self.batchInfosSubmitted = [] +self.streamingStartedTime = [] + +def onStreamingStarted(self, streamingStarted): +self.streamingStartedTime.append(streamingStarted.time) def onBatchSubmitted(self, batchSubmitted): self.batchInfosSubmitted.append(batchSubmitted.batchInfo()) @@ -530,9 +534,12 @@ class StreamingListenerTests(PySparkStreamingTestCase): batchInfosSubmitted = batch_collector.batchInfosSubmitted batchInfosStarted = batch_collector.batchInfosStarted batchInfosCompleted = batch_collector.batchInfosCompleted +streamingStartedTime = batch_collector.streamingStartedTime self.wait_for(batchInfosCompleted, 4) +self.assertEqual(len(streamingStartedTime), 1) + self.assertGreaterEqual(len(batchInfosSubmitted), 4) for info in batchInfosSubmitted: self.assertGreaterEqual(info.batchTime().milliseconds(), 0) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23919][SQL] Add array_position function
Repository: spark Updated Branches: refs/heads/master 8bb0df2c6 -> d5bec48b9 [SPARK-23919][SQL] Add array_position function ## What changes were proposed in this pull request? The PR adds the SQL function `array_position`. The behavior of the function is based on Presto's one. The function returns the position of the first occurrence of the element in array x (or 0 if not found) using 1-based index as BigInt. ## How was this patch tested? Added UTs Author: Kazuaki Ishizaki Closes #21037 from kiszk/SPARK-23919. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5bec48b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5bec48b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5bec48b Branch: refs/heads/master Commit: d5bec48b9cb225c19b43935c07b24090c51cacce Parents: 8bb0df2 Author: Kazuaki Ishizaki Authored: Thu Apr 19 11:59:17 2018 +0900 Committer: Takuya UESHIN Committed: Thu Apr 19 11:59:17 2018 +0900 -- python/pyspark/sql/functions.py | 17 ++ .../catalyst/analysis/FunctionRegistry.scala| 1 + .../expressions/collectionOperations.scala | 56 .../CollectionExpressionsSuite.scala| 22 .../scala/org/apache/spark/sql/functions.scala | 14 + .../spark/sql/DataFrameFunctionsSuite.scala | 34 6 files changed, 144 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d5bec48b/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index d3bb0a5..36dcabc 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1845,6 +1845,23 @@ def array_contains(col, value): return Column(sc._jvm.functions.array_contains(_to_java_column(col), value)) +@since(2.4) +def array_position(col, value): +""" +Collection function: Locates the position of the first occurrence of the given value +in the given array. Returns null if either of the arguments are null. + +.. note:: The position is not zero based, but 1 based index. Returns 0 if the given +value could not be found in the array. + +>>> df = spark.createDataFrame([(["c", "b", "a"],), ([],)], ['data']) +>>> df.select(array_position(df.data, "a")).collect() +[Row(array_position(data, a)=3), Row(array_position(data, a)=0)] +""" +sc = SparkContext._active_spark_context +return Column(sc._jvm.functions.array_position(_to_java_column(col), value)) + + @since(1.4) def explode(col): """Returns a new row for each element in the given array or map. http://git-wip-us.apache.org/repos/asf/spark/blob/d5bec48b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 38c874a..74095fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -402,6 +402,7 @@ object FunctionRegistry { // collection functions expression[CreateArray]("array"), expression[ArrayContains]("array_contains"), +expression[ArrayPosition]("array_position"), expression[CreateMap]("map"), expression[CreateNamedStruct]("named_struct"), expression[MapKeys]("map_keys"), http://git-wip-us.apache.org/repos/asf/spark/blob/d5bec48b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 76b71f5..e6a05f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -505,3 +505,59 @@ case class ArrayMax(child: Expression) extends UnaryExpression with ImplicitCast override def prettyName: String = "array_max" } + + +/** + * Returns the position of the first occurrence of element in the given array as long. + * Returns 0 if the given value could not be found in the array. Returns null if either of + * the arguments are null + * + * NOTE: that this is not zero based, but 1-based index. The first element in
svn commit: r26412 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_18_20_01-d5bec48-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Apr 19 03:16:17 2018 New Revision: 26412 Log: Apache Spark 2.4.0-SNAPSHOT-2018_04_18_20_01-d5bec48 docs [This commit notification would consist of 1457 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r26415 - in /dev/spark/2.3.1-SNAPSHOT-2018_04_18_22_01-32bec6c-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Apr 19 05:17:07 2018 New Revision: 26415 Log: Apache Spark 2.3.1-SNAPSHOT-2018_04_18_22_01-32bec6c docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org