svn commit: r25110 - in /dev/spark/2.4.0-SNAPSHOT-2018_02_16_20_01-15ad4a7-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sat Feb 17 04:15:48 2018 New Revision: 25110 Log: Apache Spark 2.4.0-SNAPSHOT-2018_02_16_20_01-15ad4a7 docs [This commit notification would consist of 1444 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23447][SQL] Cleanup codegen template for Literal
Repository: spark Updated Branches: refs/heads/master d5ed2108d -> 15ad4a7f1 [SPARK-23447][SQL] Cleanup codegen template for Literal ## What changes were proposed in this pull request? Cleaned up the codegen templates for `Literal`s, to make sure that the `ExprCode` returned from `Literal.doGenCode()` has: 1. an empty `code` field; 2. an `isNull` field of either literal `true` or `false`; 3. a `value` field that is just a simple literal/constant. Before this PR, there are a couple of paths that would return a non-trivial `code` and all of them are actually unnecessary. The `NaN` and `Infinity` constants for `double` and `float` can be accessed through constants directly available so there's no need to add a reference for them. Also took the opportunity to add a new util method for ease of creating `ExprCode` for inline-able non-null values. ## How was this patch tested? Existing tests. Author: Kris MokCloses #20626 from rednaxelafx/codegen-literal. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15ad4a7f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15ad4a7f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15ad4a7f Branch: refs/heads/master Commit: 15ad4a7f1000c83cefbecd41e315c964caa3c39f Parents: d5ed210 Author: Kris Mok Authored: Sat Feb 17 10:54:14 2018 +0800 Committer: Wenchen Fan Committed: Sat Feb 17 10:54:14 2018 +0800 -- .../expressions/codegen/CodeGenerator.scala | 6 +++ .../sql/catalyst/expressions/literals.scala | 51 +++- 2 files changed, 34 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/15ad4a7f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 4dcbb70..31ba29a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -58,6 +58,12 @@ import org.apache.spark.util.{ParentClassLoader, Utils} */ case class ExprCode(var code: String, var isNull: String, var value: String) +object ExprCode { + def forNonNullValue(value: String): ExprCode = { +ExprCode(code = "", isNull = "false", value = value) + } +} + /** * State used for subexpression elimination. * http://git-wip-us.apache.org/repos/asf/spark/blob/15ad4a7f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index cd176d9..c1e65e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -278,40 +278,45 @@ case class Literal (value: Any, dataType: DataType) extends LeafExpression { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val javaType = ctx.javaType(dataType) -// change the isNull and primitive to consts, to inline them if (value == null) { - ev.isNull = "true" - ev.copy(s"final $javaType ${ev.value} = ${ctx.defaultValue(dataType)};") + val defaultValueLiteral = ctx.defaultValue(javaType) match { +case "null" => s"(($javaType)null)" +case lit => lit + } + ExprCode(code = "", isNull = "true", value = defaultValueLiteral) } else { - ev.isNull = "false" dataType match { case BooleanType | IntegerType | DateType => - ev.copy(code = "", value = value.toString) + ExprCode.forNonNullValue(value.toString) case FloatType => - val v = value.asInstanceOf[Float] - if (v.isNaN || v.isInfinite) { -val boxedValue = ctx.addReferenceObj("boxedValue", v) -val code = s"final $javaType ${ev.value} = ($javaType) $boxedValue;" -ev.copy(code = code) - } else { -ev.copy(code = "", value = s"${value}f") + value.asInstanceOf[Float] match { +case v if v.isNaN => + ExprCode.forNonNullValue("Float.NaN") +case Float.PositiveInfinity => + ExprCode.forNonNullValue("Float.POSITIVE_INFINITY") +
svn commit: r25109 - in /dev/spark/2.3.1-SNAPSHOT-2018_02_16_18_01-c7a0dea-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sat Feb 17 02:16:02 2018 New Revision: 25109 Log: Apache Spark 2.3.1-SNAPSHOT-2018_02_16_18_01-c7a0dea docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: Preparing Spark release v2.3.0-rc4
Repository: spark Updated Branches: refs/heads/branch-2.3 8360da071 -> c7a0dea46 Preparing Spark release v2.3.0-rc4 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44095cb6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44095cb6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44095cb6 Branch: refs/heads/branch-2.3 Commit: 44095cb65500739695b0324c177c19dfa1471472 Parents: 8360da0 Author: Sameer AgarwalAuthored: Fri Feb 16 17:29:46 2018 -0800 Committer: Sameer Agarwal Committed: Fri Feb 16 17:29:46 2018 -0800 -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml | 2 +- common/kvstore/pom.xml| 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- hadoop-cloud/pom.xml | 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- python/pyspark/version.py | 2 +- repl/pom.xml | 2 +- resource-managers/kubernetes/core/pom.xml | 2 +- resource-managers/mesos/pom.xml | 2 +- resource-managers/yarn/pom.xml| 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- 41 files changed, 42 insertions(+), 42 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/44095cb6/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 29a8a00..6d46c31 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,6 +1,6 @@ Package: SparkR Type: Package -Version: 2.3.1 +Version: 2.3.0 Title: R Frontend for Apache Spark Description: Provides an R Frontend for Apache Spark. Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), http://git-wip-us.apache.org/repos/asf/spark/blob/44095cb6/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 5c5a8e9..2ca9ab6 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.3.1-SNAPSHOT +2.3.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/44095cb6/common/kvstore/pom.xml -- diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml index 2a625da..404c744 100644 --- a/common/kvstore/pom.xml +++ b/common/kvstore/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.3.1-SNAPSHOT +2.3.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/44095cb6/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index adb1890..3c0b528 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.3.1-SNAPSHOT +2.3.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/44095cb6/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index 4cdcfa2..fe3bcfd 100644 ---
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.3.0-rc4 [created] 44095cb65 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/2] spark git commit: Preparing development version 2.3.1-SNAPSHOT
Preparing development version 2.3.1-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7a0dea4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7a0dea4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7a0dea4 Branch: refs/heads/branch-2.3 Commit: c7a0dea46a251a27b304ac2ec9f07f97aca4b1d0 Parents: 44095cb Author: Sameer AgarwalAuthored: Fri Feb 16 17:29:51 2018 -0800 Committer: Sameer Agarwal Committed: Fri Feb 16 17:29:51 2018 -0800 -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml | 2 +- common/kvstore/pom.xml| 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- hadoop-cloud/pom.xml | 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- python/pyspark/version.py | 2 +- repl/pom.xml | 2 +- resource-managers/kubernetes/core/pom.xml | 2 +- resource-managers/mesos/pom.xml | 2 +- resource-managers/yarn/pom.xml| 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- 41 files changed, 42 insertions(+), 42 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c7a0dea4/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 6d46c31..29a8a00 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,6 +1,6 @@ Package: SparkR Type: Package -Version: 2.3.0 +Version: 2.3.1 Title: R Frontend for Apache Spark Description: Provides an R Frontend for Apache Spark. Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), http://git-wip-us.apache.org/repos/asf/spark/blob/c7a0dea4/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 2ca9ab6..5c5a8e9 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.3.0 +2.3.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/c7a0dea4/common/kvstore/pom.xml -- diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml index 404c744..2a625da 100644 --- a/common/kvstore/pom.xml +++ b/common/kvstore/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.3.0 +2.3.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/c7a0dea4/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 3c0b528..adb1890 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.3.0 +2.3.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/c7a0dea4/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index fe3bcfd..4cdcfa2 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@
spark git commit: [SPARK-23381][CORE] Murmur3 hash generates a different value from other implementations
Repository: spark Updated Branches: refs/heads/master 0a73aa31f -> d5ed2108d [SPARK-23381][CORE] Murmur3 hash generates a different value from other implementations ## What changes were proposed in this pull request? Murmur3 hash generates a different value from the original and other implementations (like Scala standard library and Guava or so) when the length of a bytes array is not multiple of 4. ## How was this patch tested? Added a unit test. **Note: When we merge this PR, please give all the credits to Shintaro Murakami.** Author: Shintaro Murakami Author: gatorsmileAuthor: Shintaro Murakami Closes #20630 from gatorsmile/pr-20568. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5ed2108 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5ed2108 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5ed2108 Branch: refs/heads/master Commit: d5ed2108d32e1d95b26ee7fed39e8a733e935e2c Parents: 0a73aa3 Author: Shintaro Murakami Authored: Fri Feb 16 17:17:55 2018 -0800 Committer: gatorsmile Committed: Fri Feb 16 17:17:55 2018 -0800 -- .../spark/util/sketch/Murmur3_x86_32.java | 16 ++ .../spark/unsafe/hash/Murmur3_x86_32.java | 16 ++ .../spark/unsafe/hash/Murmur3_x86_32Suite.java | 19 +++ .../apache/spark/ml/feature/FeatureHasher.scala | 33 +++- .../apache/spark/mllib/feature/HashingTF.scala | 2 +- .../spark/ml/feature/FeatureHasherSuite.scala | 11 ++- python/pyspark/ml/feature.py| 4 +-- 7 files changed, 96 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d5ed2108/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_x86_32.java -- diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_x86_32.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_x86_32.java index a61ce4f..e83b331 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_x86_32.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_x86_32.java @@ -60,6 +60,8 @@ final class Murmur3_x86_32 { } public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) { +// This is not compatible with original and another implementations. +// But remain it for backward compatibility for the components existing before 2.3. assert (lengthInBytes >= 0): "lengthInBytes cannot be negative"; int lengthAligned = lengthInBytes - lengthInBytes % 4; int h1 = hashBytesByInt(base, offset, lengthAligned, seed); @@ -71,6 +73,20 @@ final class Murmur3_x86_32 { return fmix(h1, lengthInBytes); } + public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) { +// This is compatible with original and another implementations. +// Use this method for new components after Spark 2.3. +assert (lengthInBytes >= 0): "lengthInBytes cannot be negative"; +int lengthAligned = lengthInBytes - lengthInBytes % 4; +int h1 = hashBytesByInt(base, offset, lengthAligned, seed); +int k1 = 0; +for (int i = lengthAligned, shift = 0; i < lengthInBytes; i++, shift += 8) { + k1 ^= (Platform.getByte(base, offset + i) & 0xFF) << shift; +} +h1 ^= mixK1(k1); +return fmix(h1, lengthInBytes); + } + private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) { assert (lengthInBytes % 4 == 0); int h1 = seed; http://git-wip-us.apache.org/repos/asf/spark/blob/d5ed2108/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java -- diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java index 5e7ee48..d239de6 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java @@ -60,6 +60,8 @@ public final class Murmur3_x86_32 { } public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) { +// This is not compatible with original and another implementations. +// But remain it for backward compatibility for the components existing before 2.3. assert (lengthInBytes >= 0): "lengthInBytes cannot be negative"; int lengthAligned = lengthInBytes - lengthInBytes % 4; int h1 = hashBytesByInt(base, offset, lengthAligned, seed); @@ -71,6 +73,20 @@ public final class
spark git commit: [SPARK-23381][CORE] Murmur3 hash generates a different value from other implementations
Repository: spark Updated Branches: refs/heads/branch-2.3 ccb0a59d7 -> 8360da071 [SPARK-23381][CORE] Murmur3 hash generates a different value from other implementations ## What changes were proposed in this pull request? Murmur3 hash generates a different value from the original and other implementations (like Scala standard library and Guava or so) when the length of a bytes array is not multiple of 4. ## How was this patch tested? Added a unit test. **Note: When we merge this PR, please give all the credits to Shintaro Murakami.** Author: Shintaro Murakami Author: gatorsmileAuthor: Shintaro Murakami Closes #20630 from gatorsmile/pr-20568. (cherry picked from commit d5ed2108d32e1d95b26ee7fed39e8a733e935e2c) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8360da07 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8360da07 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8360da07 Branch: refs/heads/branch-2.3 Commit: 8360da07110d847a01b243e6d786922a5057ad9f Parents: ccb0a59 Author: Shintaro Murakami Authored: Fri Feb 16 17:17:55 2018 -0800 Committer: gatorsmile Committed: Fri Feb 16 17:18:15 2018 -0800 -- .../spark/util/sketch/Murmur3_x86_32.java | 16 ++ .../spark/unsafe/hash/Murmur3_x86_32.java | 16 ++ .../spark/unsafe/hash/Murmur3_x86_32Suite.java | 19 +++ .../apache/spark/ml/feature/FeatureHasher.scala | 33 +++- .../apache/spark/mllib/feature/HashingTF.scala | 2 +- .../spark/ml/feature/FeatureHasherSuite.scala | 11 ++- python/pyspark/ml/feature.py| 4 +-- 7 files changed, 96 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8360da07/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_x86_32.java -- diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_x86_32.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_x86_32.java index a61ce4f..e83b331 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_x86_32.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_x86_32.java @@ -60,6 +60,8 @@ final class Murmur3_x86_32 { } public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) { +// This is not compatible with original and another implementations. +// But remain it for backward compatibility for the components existing before 2.3. assert (lengthInBytes >= 0): "lengthInBytes cannot be negative"; int lengthAligned = lengthInBytes - lengthInBytes % 4; int h1 = hashBytesByInt(base, offset, lengthAligned, seed); @@ -71,6 +73,20 @@ final class Murmur3_x86_32 { return fmix(h1, lengthInBytes); } + public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) { +// This is compatible with original and another implementations. +// Use this method for new components after Spark 2.3. +assert (lengthInBytes >= 0): "lengthInBytes cannot be negative"; +int lengthAligned = lengthInBytes - lengthInBytes % 4; +int h1 = hashBytesByInt(base, offset, lengthAligned, seed); +int k1 = 0; +for (int i = lengthAligned, shift = 0; i < lengthInBytes; i++, shift += 8) { + k1 ^= (Platform.getByte(base, offset + i) & 0xFF) << shift; +} +h1 ^= mixK1(k1); +return fmix(h1, lengthInBytes); + } + private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) { assert (lengthInBytes % 4 == 0); int h1 = seed; http://git-wip-us.apache.org/repos/asf/spark/blob/8360da07/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java -- diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java index 5e7ee48..d239de6 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java @@ -60,6 +60,8 @@ public final class Murmur3_x86_32 { } public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) { +// This is not compatible with original and another implementations. +// But remain it for backward compatibility for the components existing before 2.3. assert (lengthInBytes >= 0): "lengthInBytes cannot be negative"; int lengthAligned =
svn commit: r25108 - in /dev/spark/2.4.0-SNAPSHOT-2018_02_16_16_01-0a73aa3-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sat Feb 17 00:15:21 2018 New Revision: 25108 Log: Apache Spark 2.4.0-SNAPSHOT-2018_02_16_16_01-0a73aa3 docs [This commit notification would consist of 1444 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Repository: spark Updated Branches: refs/heads/master c5857e496 -> 0a73aa31f http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala deleted file mode 100644 index 02c8764..000 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ /dev/null @@ -1,1122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import java.io._ -import java.nio.charset.StandardCharsets.UTF_8 -import java.nio.file.{Files, Paths} -import java.util.{Locale, Properties} -import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.atomic.AtomicInteger - -import scala.collection.mutable -import scala.util.Random - -import org.apache.kafka.clients.producer.RecordMetadata -import org.apache.kafka.common.TopicPartition -import org.scalatest.concurrent.PatienceConfiguration.Timeout -import org.scalatest.time.SpanSugar._ - -import org.apache.spark.SparkContext -import org.apache.spark.sql.{Dataset, ForeachWriter} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution -import org.apache.spark.sql.functions.{count, window} -import org.apache.spark.sql.kafka010.KafkaSourceProvider._ -import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} -import org.apache.spark.sql.streaming.util.StreamManualClock -import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} -import org.apache.spark.util.Utils - -abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { - - protected var testUtils: KafkaTestUtils = _ - - override val streamingTimeout = 30.seconds - - protected val brokerProps = Map[String, Object]() - - override def beforeAll(): Unit = { -super.beforeAll() -testUtils = new KafkaTestUtils(brokerProps) -testUtils.setup() - } - - override def afterAll(): Unit = { -if (testUtils != null) { - testUtils.teardown() - testUtils = null -} -super.afterAll() - } - - protected def makeSureGetOffsetCalled = AssertOnQuery { q => -// Because KafkaSource's initialPartitionOffsets is set lazily, we need to make sure -// its "getOffset" is called before pushing any data. Otherwise, because of the race condition, -// we don't know which data should be fetched when `startingOffsets` is latest. -q match { - case c: ContinuousExecution => c.awaitEpoch(0) - case m: MicroBatchExecution => m.processAllAvailable() -} -true - } - - protected def setTopicPartitions(topic: String, newCount: Int, query: StreamExecution) : Unit = { -testUtils.addPartitions(topic, newCount) - } - - /** - * Add data to Kafka. - * - * `topicAction` can be used to run actions for each topic before inserting data. - */ - case class AddKafkaData(topics: Set[String], data: Int*) -(implicit ensureDataInMultiplePartition: Boolean = false, - concurrent: Boolean = false, - message: String = "", - topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends AddData { - -override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { - query match { -// Make sure no Spark job is running when deleting a topic -case Some(m: MicroBatchExecution) => m.processAllAvailable() -case _ => - } - - val existingTopics = testUtils.getAllTopicsAndPartitionSize().toMap - val newTopics = topics.diff(existingTopics.keySet) - for (newTopic <- newTopics) { -topicAction(newTopic, None) - } - for (existingTopicPartitions <- existingTopics) { -topicAction(existingTopicPartitions._1, Some(existingTopicPartitions._2)) - } - - require( -query.nonEmpty, -
[2/2] spark git commit: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
[SPARK-23362][SS] Migrate Kafka Microbatch source to v2 ## What changes were proposed in this pull request? Migrating KafkaSource (with data source v1) to KafkaMicroBatchReader (with data source v2). Performance comparison: In a unit test with in-process Kafka broker, I tested the read throughput of V1 and V2 using 20M records in a single partition. They were comparable. ## How was this patch tested? Existing tests, few modified to be better tests than the existing ones. Author: Tathagata DasCloses #20554 from tdas/SPARK-23362. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a73aa31 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a73aa31 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a73aa31 Branch: refs/heads/master Commit: 0a73aa31f41c83503d5d99eff3c9d7b406014ab3 Parents: c5857e4 Author: Tathagata Das Authored: Fri Feb 16 14:30:19 2018 -0800 Committer: Tathagata Das Committed: Fri Feb 16 14:30:19 2018 -0800 -- dev/.rat-excludes |1 + .../sql/kafka010/CachedKafkaConsumer.scala |2 +- .../sql/kafka010/KafkaContinuousReader.scala| 29 +- .../sql/kafka010/KafkaMicroBatchReader.scala| 403 ++ .../KafkaRecordToUnsafeRowConverter.scala | 52 + .../apache/spark/sql/kafka010/KafkaSource.scala | 19 +- .../sql/kafka010/KafkaSourceProvider.scala | 70 +- ...fka-source-initial-offset-future-version.bin |2 + ...afka-source-initial-offset-version-2.1.0.bin |2 +- .../kafka010/KafkaMicroBatchSourceSuite.scala | 1222 ++ .../spark/sql/kafka010/KafkaSourceSuite.scala | 1122 .../org/apache/spark/sql/internal/SQLConf.scala | 15 +- .../streaming/MicroBatchExecution.scala | 20 +- 13 files changed, 1786 insertions(+), 1173 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/dev/.rat-excludes -- diff --git a/dev/.rat-excludes b/dev/.rat-excludes index 243fbe3..9552d00 100644 --- a/dev/.rat-excludes +++ b/dev/.rat-excludes @@ -105,3 +105,4 @@ META-INF/* spark-warehouse structured-streaming/* kafka-source-initial-offset-version-2.1.0.bin +kafka-source-initial-offset-future-version.bin http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala index 90ed7b1..e97881c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.{SparkEnv, SparkException, TaskContext} import org.apache.spark.internal.Logging -import org.apache.spark.sql.kafka010.KafkaSource._ +import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.util.UninterruptibleThread http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala index b049a05..97a0f66 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} import
svn commit: r25105 - in /dev/spark/2.4.0-SNAPSHOT-2018_02_16_12_01-c5857e4-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Feb 16 20:15:37 2018 New Revision: 25105 Log: Apache Spark 2.4.0-SNAPSHOT-2018_02_16_12_01-c5857e4 docs [This commit notification would consist of 1444 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r25103 - in /dev/spark/2.3.1-SNAPSHOT-2018_02_16_10_01-ccb0a59-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Feb 16 18:15:53 2018 New Revision: 25103 Log: Apache Spark 2.3.1-SNAPSHOT-2018_02_16_10_01-ccb0a59 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23446][PYTHON] Explicitly check supported types in toPandas
Repository: spark Updated Branches: refs/heads/master 1dc2c1d5e -> c5857e496 [SPARK-23446][PYTHON] Explicitly check supported types in toPandas ## What changes were proposed in this pull request? This PR explicitly specifies and checks the types we supported in `toPandas`. This was a hole. For example, we haven't finished the binary type support in Python side yet but now it allows as below: ```python spark.conf.set("spark.sql.execution.arrow.enabled", "false") df = spark.createDataFrame([[bytearray("a")]]) df.toPandas() spark.conf.set("spark.sql.execution.arrow.enabled", "true") df.toPandas() ``` ``` _1 0 [97] _1 0 a ``` This should be disallowed. I think the same things also apply to nested timestamps too. I also added some nicer message about `spark.sql.execution.arrow.enabled` in the error message. ## How was this patch tested? Manually tested and tests added in `python/pyspark/sql/tests.py`. Author: hyukjinkwonCloses #20625 from HyukjinKwon/pandas_convertion_supported_type. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5857e49 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5857e49 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5857e49 Branch: refs/heads/master Commit: c5857e496ff0d170ed0339f14afc7d36b192da6d Parents: 1dc2c1d Author: hyukjinkwon Authored: Fri Feb 16 09:41:17 2018 -0800 Committer: gatorsmile Committed: Fri Feb 16 09:41:17 2018 -0800 -- python/pyspark/sql/dataframe.py | 15 +-- python/pyspark/sql/tests.py | 9 - 2 files changed, 17 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c5857e49/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 5cc8b63..f3e 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1988,10 +1988,11 @@ class DataFrame(object): if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true": try: from pyspark.sql.types import _check_dataframe_convert_date, \ -_check_dataframe_localize_timestamps +_check_dataframe_localize_timestamps, to_arrow_schema from pyspark.sql.utils import require_minimum_pyarrow_version -import pyarrow require_minimum_pyarrow_version() +import pyarrow +to_arrow_schema(self.schema) tables = self._collectAsArrow() if tables: table = pyarrow.concat_tables(tables) @@ -2000,10 +2001,12 @@ class DataFrame(object): return _check_dataframe_localize_timestamps(pdf, timezone) else: return pd.DataFrame.from_records([], columns=self.columns) -except ImportError as e: -msg = "note: pyarrow must be installed and available on calling Python process " \ - "if using spark.sql.execution.arrow.enabled=true" -raise ImportError("%s\n%s" % (_exception_message(e), msg)) +except Exception as e: +msg = ( +"Note: toPandas attempted Arrow optimization because " +"'spark.sql.execution.arrow.enabled' is set to true. Please set it to false " +"to disable this.") +raise RuntimeError("%s\n%s" % (_exception_message(e), msg)) else: pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) http://git-wip-us.apache.org/repos/asf/spark/blob/c5857e49/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 2af218a..1965307 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3497,7 +3497,14 @@ class ArrowTests(ReusedSQLTestCase): schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)]) df = self.spark.createDataFrame([(None,)], schema=schema) with QuietTest(self.sc): -with self.assertRaisesRegexp(Exception, 'Unsupported data type'): +with self.assertRaisesRegexp(Exception, 'Unsupported type'): +df.toPandas() + +df = self.spark.createDataFrame([(None,)], schema="a binary") +with QuietTest(self.sc): +with self.assertRaisesRegexp( +Exception, +'Unsupported type.*\nNote: toPandas attempted Arrow optimization because'):
spark git commit: [SPARK-23446][PYTHON] Explicitly check supported types in toPandas
Repository: spark Updated Branches: refs/heads/branch-2.3 75bb19a01 -> ccb0a59d7 [SPARK-23446][PYTHON] Explicitly check supported types in toPandas ## What changes were proposed in this pull request? This PR explicitly specifies and checks the types we supported in `toPandas`. This was a hole. For example, we haven't finished the binary type support in Python side yet but now it allows as below: ```python spark.conf.set("spark.sql.execution.arrow.enabled", "false") df = spark.createDataFrame([[bytearray("a")]]) df.toPandas() spark.conf.set("spark.sql.execution.arrow.enabled", "true") df.toPandas() ``` ``` _1 0 [97] _1 0 a ``` This should be disallowed. I think the same things also apply to nested timestamps too. I also added some nicer message about `spark.sql.execution.arrow.enabled` in the error message. ## How was this patch tested? Manually tested and tests added in `python/pyspark/sql/tests.py`. Author: hyukjinkwonCloses #20625 from HyukjinKwon/pandas_convertion_supported_type. (cherry picked from commit c5857e496ff0d170ed0339f14afc7d36b192da6d) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccb0a59d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccb0a59d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccb0a59d Branch: refs/heads/branch-2.3 Commit: ccb0a59d7383db451b86aee67423eb6e28f1f982 Parents: 75bb19a Author: hyukjinkwon Authored: Fri Feb 16 09:41:17 2018 -0800 Committer: gatorsmile Committed: Fri Feb 16 09:41:32 2018 -0800 -- python/pyspark/sql/dataframe.py | 15 +-- python/pyspark/sql/tests.py | 9 - 2 files changed, 17 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ccb0a59d/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index faee870..930d177 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1943,10 +1943,11 @@ class DataFrame(object): if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true": try: from pyspark.sql.types import _check_dataframe_convert_date, \ -_check_dataframe_localize_timestamps +_check_dataframe_localize_timestamps, to_arrow_schema from pyspark.sql.utils import require_minimum_pyarrow_version -import pyarrow require_minimum_pyarrow_version() +import pyarrow +to_arrow_schema(self.schema) tables = self._collectAsArrow() if tables: table = pyarrow.concat_tables(tables) @@ -1955,10 +1956,12 @@ class DataFrame(object): return _check_dataframe_localize_timestamps(pdf, timezone) else: return pd.DataFrame.from_records([], columns=self.columns) -except ImportError as e: -msg = "note: pyarrow must be installed and available on calling Python process " \ - "if using spark.sql.execution.arrow.enabled=true" -raise ImportError("%s\n%s" % (_exception_message(e), msg)) +except Exception as e: +msg = ( +"Note: toPandas attempted Arrow optimization because " +"'spark.sql.execution.arrow.enabled' is set to true. Please set it to false " +"to disable this.") +raise RuntimeError("%s\n%s" % (_exception_message(e), msg)) else: pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) http://git-wip-us.apache.org/repos/asf/spark/blob/ccb0a59d/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 904fa7a..da50b4d 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3443,7 +3443,14 @@ class ArrowTests(ReusedSQLTestCase): schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)]) df = self.spark.createDataFrame([(None,)], schema=schema) with QuietTest(self.sc): -with self.assertRaisesRegexp(Exception, 'Unsupported data type'): +with self.assertRaisesRegexp(Exception, 'Unsupported type'): +df.toPandas() + +df = self.spark.createDataFrame([(None,)], schema="a binary") +with QuietTest(self.sc): +with self.assertRaisesRegexp( +