spark git commit: [SPARK-23466][SQL] Remove redundant null checks in generated Java code by GenerateUnsafeProjection
Repository: spark Updated Branches: refs/heads/master e1d72f2c0 -> c5583fdcd [SPARK-23466][SQL] Remove redundant null checks in generated Java code by GenerateUnsafeProjection ## What changes were proposed in this pull request? This PR works for one of TODOs in `GenerateUnsafeProjection` "if the nullability of field is correct, we can use it to save null check" to simplify generated code. When `nullable=false` in `DataType`, `GenerateUnsafeProjection` removed code for null checks in the generated Java code. ## How was this patch tested? Added new test cases into `GenerateUnsafeProjectionSuite` Closes #20637 from kiszk/SPARK-23466. Authored-by: Kazuaki Ishizaki Signed-off-by: Takuya UESHIN Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5583fdc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5583fdc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5583fdc Branch: refs/heads/master Commit: c5583fdcd2289559ad98371475eb7288ced9b148 Parents: e1d72f2 Author: Kazuaki Ishizaki Authored: Sat Sep 1 12:19:19 2018 +0900 Committer: Takuya UESHIN Committed: Sat Sep 1 12:19:19 2018 +0900 -- .../codegen/GenerateUnsafeProjection.scala | 77 .../expressions/JsonExpressionsSuite.scala | 2 +- .../codegen/GenerateUnsafeProjectionSuite.scala | 71 +- 3 files changed, 117 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c5583fdc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index 998a675..0ecd0de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -32,6 +32,8 @@ import org.apache.spark.sql.types._ */ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafeProjection] { + case class Schema(dataType: DataType, nullable: Boolean) + /** Returns true iff we support this data type. */ def canSupport(dataType: DataType): Boolean = UserDefinedType.sqlType(dataType) match { case NullType => true @@ -43,19 +45,21 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro case _ => false } - // TODO: if the nullability of field is correct, we can use it to save null check. private def writeStructToBuffer( ctx: CodegenContext, input: String, index: String, - fieldTypes: Seq[DataType], + schemas: Seq[Schema], rowWriter: String): String = { // Puts `input` in a local variable to avoid to re-evaluate it if it's a statement. val tmpInput = ctx.freshName("tmpInput") -val fieldEvals = fieldTypes.zipWithIndex.map { case (dt, i) => - ExprCode( -JavaCode.isNullExpression(s"$tmpInput.isNullAt($i)"), -JavaCode.expression(CodeGenerator.getValue(tmpInput, dt, i.toString), dt)) +val fieldEvals = schemas.zipWithIndex.map { case (Schema(dt, nullable), i) => + val isNull = if (nullable) { +JavaCode.isNullExpression(s"$tmpInput.isNullAt($i)") + } else { +FalseLiteral + } + ExprCode(isNull, JavaCode.expression(CodeGenerator.getValue(tmpInput, dt, i.toString), dt)) } val rowWriterClass = classOf[UnsafeRowWriter].getName @@ -70,7 +74,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro | // Remember the current cursor so that we can calculate how many bytes are | // written later. | final int $previousCursor = $rowWriter.cursor(); - | ${writeExpressionsToBuffer(ctx, tmpInput, fieldEvals, fieldTypes, structRowWriter)} + | ${writeExpressionsToBuffer(ctx, tmpInput, fieldEvals, schemas, structRowWriter)} | $rowWriter.setOffsetAndSizeFromPreviousCursor($index, $previousCursor); |} """.stripMargin @@ -80,7 +84,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro ctx: CodegenContext, row: String, inputs: Seq[ExprCode], - inputTypes: Seq[DataType], + schemas: Seq[Schema], rowWriter: String, isTopLevel: Boolean = false): String = { val resetWriter = if (isTopLevel) { @@ -98,8 +102,8 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro s"$rowWriter.resetRowWriter();"
svn commit: r29075 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_31_16_02-e1d72f2-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 31 23:16:05 2018 New Revision: 29075 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_31_16_02-e1d72f2 docs [This commit notification would consist of 1478 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25264][K8S] Fix comma-delineated arguments passed into PythonRunner and RRunner
Repository: spark Updated Branches: refs/heads/master 32da87dfa -> e1d72f2c0 [SPARK-25264][K8S] Fix comma-delineated arguments passed into PythonRunner and RRunner ## What changes were proposed in this pull request? Fixes the issue brought up in https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/273 where the arguments were being comma-delineated, which was incorrect wrt to the PythonRunner and RRunner. ## How was this patch tested? Modified unit test to test this change. Author: Ilan Filonenko Closes #22257 from ifilonenko/SPARK-25264. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1d72f2c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1d72f2c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1d72f2c Branch: refs/heads/master Commit: e1d72f2c07ecd6f1880299e9373daa21cb032017 Parents: 32da87d Author: Ilan Filonenko Authored: Fri Aug 31 15:46:45 2018 -0700 Committer: mcheah Committed: Fri Aug 31 15:46:45 2018 -0700 -- .../deploy/k8s/features/bindings/PythonDriverFeatureStep.scala | 3 ++- .../spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala | 3 ++- .../k8s/features/bindings/PythonDriverFeatureStepSuite.scala | 4 ++-- .../deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala | 4 ++-- 4 files changed, 8 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e1d72f2c/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala -- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala index c20bcac..406944a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala @@ -30,11 +30,12 @@ private[spark] class PythonDriverFeatureStep( override def configurePod(pod: SparkPod): SparkPod = { val roleConf = kubernetesConf.roleSpecificConf require(roleConf.mainAppResource.isDefined, "PySpark Main Resource must be defined") +// Delineation is done by " " because that is input into PythonRunner val maybePythonArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map( pyArgs => new EnvVarBuilder() .withName(ENV_PYSPARK_ARGS) - .withValue(pyArgs.mkString(",")) + .withValue(pyArgs.mkString(" ")) .build()) val maybePythonFiles = kubernetesConf.pyFiles().map( // Dilineation by ":" is to append the PySpark Files to the PYTHONPATH http://git-wip-us.apache.org/repos/asf/spark/blob/e1d72f2c/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala -- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala index b33b86e..11b09b3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala @@ -30,11 +30,12 @@ private[spark] class RDriverFeatureStep( override def configurePod(pod: SparkPod): SparkPod = { val roleConf = kubernetesConf.roleSpecificConf require(roleConf.mainAppResource.isDefined, "R Main Resource must be defined") +// Delineation is done by " " because that is input into RRunner val maybeRArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map( rArgs => new EnvVarBuilder() .withName(ENV_R_ARGS) - .withValue(rArgs.mkString(",")) + .withValue(rArgs.mkString(" ")) .build()) val envSeq = Seq(new EnvVarBuilder() http://git-wip-us.apache.org/repos/asf/spark/blob/e1d72f2c/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala -- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala
svn commit: r29067 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_31_12_02-32da87d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 31 19:16:27 2018 New Revision: 29067 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_31_12_02-32da87d docs [This commit notification would consist of 1478 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25286][CORE] Removing the dangerous parmap
Repository: spark Updated Branches: refs/heads/master 7fc8881b0 -> 32da87dfa [SPARK-25286][CORE] Removing the dangerous parmap ## What changes were proposed in this pull request? I propose to remove one of `parmap` methods which accepts an execution context as a parameter. The method should be removed to eliminate any deadlocks that can occur if `parmap` is called recursively on thread pools restricted by size. Closes #22292 from MaxGekk/remove-overloaded-parmap. Authored-by: Maxim Gekk Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32da87df Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32da87df Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32da87df Branch: refs/heads/master Commit: 32da87dfa451fff677ed9316f740be2abdbff6a4 Parents: 7fc8881 Author: Maxim Gekk Authored: Fri Aug 31 10:43:30 2018 -0700 Committer: Xiao Li Committed: Fri Aug 31 10:43:30 2018 -0700 -- .../scala/org/apache/spark/rdd/UnionRDD.scala | 17 ++- .../org/apache/spark/util/ThreadUtils.scala | 32 +++- .../streaming/util/FileBasedWriteAheadLog.scala | 5 +-- 3 files changed, 16 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/32da87df/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index 4b6f732..60e383a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -20,13 +20,12 @@ package org.apache.spark.rdd import java.io.{IOException, ObjectOutputStream} import scala.collection.mutable.ArrayBuffer -import scala.concurrent.ExecutionContext +import scala.collection.parallel.ForkJoinTaskSupport import scala.concurrent.forkjoin.ForkJoinPool import scala.reflect.ClassTag import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.util.ThreadUtils.parmap import org.apache.spark.util.Utils /** @@ -60,7 +59,8 @@ private[spark] class UnionPartition[T: ClassTag]( } object UnionRDD { - private[spark] lazy val threadPool = new ForkJoinPool(8) + private[spark] lazy val partitionEvalTaskSupport = +new ForkJoinTaskSupport(new ForkJoinPool(8)) } @DeveloperApi @@ -74,13 +74,14 @@ class UnionRDD[T: ClassTag]( rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10) override def getPartitions: Array[Partition] = { -val partitionLengths = if (isPartitionListingParallel) { - implicit val ec = ExecutionContext.fromExecutor(UnionRDD.threadPool) - parmap(rdds)(_.partitions.length) +val parRDDs = if (isPartitionListingParallel) { + val parArray = rdds.par + parArray.tasksupport = UnionRDD.partitionEvalTaskSupport + parArray } else { - rdds.map(_.partitions.length) + rdds } -val array = new Array[Partition](partitionLengths.sum) +val array = new Array[Partition](parRDDs.map(_.partitions.length).seq.sum) var pos = 0 for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) { array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index) http://git-wip-us.apache.org/repos/asf/spark/blob/32da87df/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index f0e5add..cb0c205 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -284,36 +284,12 @@ private[spark] object ThreadUtils { try { implicit val ec = ExecutionContext.fromExecutor(pool) - parmap(in)(f) + val futures = in.map(x => Future(f(x))) + val futureSeq = Future.sequence(futures) + + awaitResult(futureSeq, Duration.Inf) } finally { pool.shutdownNow() } } - - /** - * Transforms input collection by applying the given function to each element in parallel fashion. - * Comparing to the map() method of Scala parallel collections, this method can be interrupted - * at any time. This is useful on canceling of task execution, for example. - * - * @param in - the input collection which should be transformed in parallel. - * @param f - the lambda function will be applied to each element of `in`. - * @param ec - an execution context for parallel applying of the given function `f`. - * @tparam I - the
spark git commit: [SPARK-25296][SQL][TEST] Create ExplainSuite
Repository: spark Updated Branches: refs/heads/master 339859c4e -> 7fc8881b0 [SPARK-25296][SQL][TEST] Create ExplainSuite ## What changes were proposed in this pull request? Move the output verification of Explain test cases to a new suite ExplainSuite. ## How was this patch tested? N/A Closes #22300 from gatorsmile/test3200. Authored-by: Xiao Li Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7fc8881b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7fc8881b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7fc8881b Branch: refs/heads/master Commit: 7fc8881b0fbc3d85a524e0454fa89925e92c4fa4 Parents: 339859c Author: Xiao Li Authored: Fri Aug 31 08:47:20 2018 -0700 Committer: Xiao Li Committed: Fri Aug 31 08:47:20 2018 -0700 -- .../org/apache/spark/sql/DataFrameSuite.scala | 9 --- .../apache/spark/sql/DatasetCacheSuite.scala| 11 .../org/apache/spark/sql/DatasetSuite.scala | 10 .../org/apache/spark/sql/ExplainSuite.scala | 58 4 files changed, 58 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7fc8881b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 6f5c730..d43fcf3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2553,13 +2553,4 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } } - test("SPARK-23034 show rdd names in RDD scan nodes") { -val rddWithName = spark.sparkContext.parallelize(Row(1, "abc") :: Nil).setName("testRdd") -val df2 = spark.createDataFrame(rddWithName, StructType.fromDDL("c0 int, c1 string")) -val output2 = new java.io.ByteArrayOutputStream() -Console.withOut(output2) { - df2.explain(extended = false) -} -assert(output2.toString.contains("Scan ExistingRDD testRdd")) - } } http://git-wip-us.apache.org/repos/asf/spark/blob/7fc8881b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala index 44177e3..5c6a021 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala @@ -206,15 +206,4 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits // first time use, load cache checkDataset(df5, Row(10)) } - - test("SPARK-24850 InMemoryRelation string representation does not include cached plan") { -val df = Seq(1).toDF("a").cache() -val outputStream = new java.io.ByteArrayOutputStream() -Console.withOut(outputStream) { - df.explain(false) -} -assert(outputStream.toString.replaceAll("#\\d+", "#x").contains( - "InMemoryRelation [a#x], StorageLevel(disk, memory, deserialized, 1 replicas)" -)) - } } http://git-wip-us.apache.org/repos/asf/spark/blob/7fc8881b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 6069f28..cf24eba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1498,16 +1498,6 @@ class DatasetSuite extends QueryTest with SharedSQLContext { df.where($"city".contains(new java.lang.Character('A'))), Seq(Row("Amsterdam"))) } - - test("SPARK-23034 show rdd names in RDD scan nodes") { -val rddWithName = spark.sparkContext.parallelize(SingleData(1) :: Nil).setName("testRdd") -val df = spark.createDataFrame(rddWithName) -val output = new java.io.ByteArrayOutputStream() -Console.withOut(output) { - df.explain(extended = false) -} -assert(output.toString.contains("Scan testRdd")) - } } case class TestDataUnion(x: Int, y: Int, z: Int) http://git-wip-us.apache.org/repos/asf/spark/blob/7fc8881b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
svn commit: r29060 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_31_08_02-339859c-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 31 15:17:20 2018 New Revision: 29060 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_31_08_02-339859c docs [This commit notification would consist of 1478 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25261][MINOR][DOC] update the description for spark.executor|driver.memory in configuration.md
Repository: spark Updated Branches: refs/heads/master 8d9495a8f -> 339859c4e [SPARK-25261][MINOR][DOC] update the description for spark.executor|driver.memory in configuration.md ## What changes were proposed in this pull request? As described in [SPARK-25261](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-25261)ï¼the unit of spark.executor.memory and spark.driver.memory is parsed as bytes in some cases if no unit specified, while in https://spark.apache.org/docs/latest/configuration.html#application-properties, they are descibed as MiB, which may lead to some misunderstandings. ## How was this patch tested? N/A Closes #22252 from ivoson/branch-correct-configuration. Lead-authored-by: huangtengfei02 Co-authored-by: Huang Tengfei Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/339859c4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/339859c4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/339859c4 Branch: refs/heads/master Commit: 339859c4e4b27726ba8ce9a64451a981ef74de0c Parents: 8d9495a Author: huangtengfei02 Authored: Fri Aug 31 09:06:38 2018 -0500 Committer: Sean Owen Committed: Fri Aug 31 09:06:38 2018 -0500 -- docs/configuration.md | 9 + 1 file changed, 5 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/339859c4/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index b5ff426..f344bcd 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -152,8 +152,9 @@ of the most common options to set are: spark.driver.memory 1g -Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in MiB -unless otherwise specified (e.g. 1g, 2g). +Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in the +same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") +(e.g. 512m, 2g). Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. @@ -175,8 +176,8 @@ of the most common options to set are: spark.executor.memory 1g -Amount of memory to use per executor process, in MiB unless otherwise specified. -(e.g. 2g, 8g). +Amount of memory to use per executor process, in the same format as JVM memory strings with +a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g). - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25207][SQL] Case-insensitve field resolution for filter pushdown when reading Parquet
Repository: spark Updated Branches: refs/heads/master 515708d5f -> 8d9495a8f [SPARK-25207][SQL] Case-insensitve field resolution for filter pushdown when reading Parquet ## What changes were proposed in this pull request? Currently, filter pushdown will not work if Parquet schema and Hive metastore schema are in different letter cases even spark.sql.caseSensitive is false. Like the below case: ```scala spark.sparkContext.hadoopConfiguration.setInt("parquet.block.size", 8 * 1024 * 1024) spark.range(1, 40 * 1024 * 1024, 1, 1).sortWithinPartitions("id").write.parquet("/tmp/t") sql("CREATE TABLE t (ID LONG) USING parquet LOCATION '/tmp/t'") sql("select * from t where id < 100L").write.csv("/tmp/id") ``` Although filter "ID < 100L" is generated by Spark, it fails to pushdown into parquet actually, Spark still does the full table scan when reading. This PR provides a case-insensitive field resolution to make it work. Before - "ID < 100L" fail to pushedown: https://user-images.githubusercontent.com/2989575/44530558-40ef8b00-a721-11e8-8abc-7f97671590d3.png;> After - "ID < 100L" pushedown sucessfully: https://user-images.githubusercontent.com/2989575/44530567-44831200-a721-11e8-8634-e9f664b33d39.png;> ## How was this patch tested? Added UTs. Closes #22197 from yucai/SPARK-25207. Authored-by: yucai Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d9495a8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d9495a8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d9495a8 Branch: refs/heads/master Commit: 8d9495a8f1e64dbc42c3741f9bcbd4893ce3f0e9 Parents: 515708d Author: yucai Authored: Fri Aug 31 19:24:09 2018 +0800 Committer: Wenchen Fan Committed: Fri Aug 31 19:24:09 2018 +0800 -- .../datasources/parquet/ParquetFileFormat.scala | 3 +- .../datasources/parquet/ParquetFilters.scala| 90 ++- .../parquet/ParquetFilterSuite.scala| 115 ++- 3 files changed, 179 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8d9495a8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index d7eb143..ea4f159 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -347,6 +347,7 @@ class ParquetFileFormat val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold +val isCaseSensitive = sqlConf.caseSensitiveAnalysis (file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) @@ -372,7 +373,7 @@ class ParquetFileFormat val pushed = if (enableParquetFilterPushDown) { val parquetSchema = footerFileMetaData.getSchema val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal, - pushDownStringStartWith, pushDownInFilterThreshold) + pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) filters // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` http://git-wip-us.apache.org/repos/asf/spark/blob/8d9495a8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 58b4a76..0c286de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long => JLong} import java.math.{BigDecimal => JBigDecimal} import java.sql.{Date, Timestamp} +import java.util.Locale import scala.collection.JavaConverters.asScalaBufferConverter @@
spark git commit: [SPARK-25183][SQL] Spark HiveServer2 to use Spark ShutdownHookManager
Repository: spark Updated Branches: refs/heads/master aa70a0a1a -> 515708d5f [SPARK-25183][SQL] Spark HiveServer2 to use Spark ShutdownHookManager ## What changes were proposed in this pull request? Switch `org.apache.hive.service.server.HiveServer2` to register its shutdown callback with Spark's `ShutdownHookManager`, rather than direct with the Java Runtime callback. This avoids race conditions in shutdown where the filesystem is shutdown before the flush/write/rename of the event log is completed, particularly on object stores where the write and rename can be slow. ## How was this patch tested? There's no explicit unit for test this, which is consistent with every other shutdown hook in the codebase. * There's an implicit test when the scalatest process is halted. * More manual/integration testing is needed. HADOOP-15679 has added the ability to explicitly execute the hadoop shutdown hook sequence which spark uses; that could be stabilized for testing if desired, after which all the spark hooks could be tested. Until then: external system tests only. Author: Steve Loughran Closes #22186 from steveloughran/BUG/SPARK-25183-shutdown. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/515708d5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/515708d5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/515708d5 Branch: refs/heads/master Commit: 515708d5f33d5acdb4206c626192d1838f8e691f Parents: aa70a0a Author: Steve Loughran Authored: Fri Aug 31 14:45:29 2018 +0800 Committer: jerryshao Committed: Fri Aug 31 14:45:29 2018 +0800 -- .../apache/hive/service/server/HiveServer2.java | 30 ++-- 1 file changed, 22 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/515708d5/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java -- diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java index 9bf96cf..a30be2b 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java @@ -20,6 +20,9 @@ package org.apache.hive.service.server; import java.util.Properties; +import scala.runtime.AbstractFunction0; +import scala.runtime.BoxedUnit; + import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; @@ -39,6 +42,8 @@ import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.hive.service.cli.thrift.ThriftHttpCLIService; +import org.apache.spark.util.ShutdownHookManager; + /** * HiveServer2. * @@ -67,13 +72,23 @@ public class HiveServer2 extends CompositeService { super.init(hiveConf); // Add a shutdown hook for catching SIGTERM & SIGINT -final HiveServer2 hiveServer2 = this; -Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { -hiveServer2.stop(); - } -}); +// this must be higher than the Hadoop Filesystem priority of 10, +// which the default priority is. +// The signature of the callback must match that of a scala () -> Unit +// function +ShutdownHookManager.addShutdownHook( +new AbstractFunction0() { + public BoxedUnit apply() { +try { + LOG.info("Hive Server Shutdown hook invoked"); + stop(); +} catch (Throwable e) { + LOG.warn("Ignoring Exception while stopping Hive Server from shutdown hook", + e); +} +return BoxedUnit.UNIT; + } +}); } public static boolean isHTTPTransportMode(HiveConf hiveConf) { @@ -95,7 +110,6 @@ public class HiveServer2 extends CompositeService { @Override public synchronized void stop() { LOG.info("Shutting down HiveServer2"); -HiveConf hiveConf = this.getHiveConf(); super.stop(); } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25288][TESTS] Fix flaky Kafka transaction tests
Repository: spark Updated Branches: refs/heads/master f29c2b528 -> aa70a0a1a [SPARK-25288][TESTS] Fix flaky Kafka transaction tests ## What changes were proposed in this pull request? Here are the failures: http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaRelationSuite_name=read+Kafka+transactional+messages%3A+read_committed http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceSuite_name=read+Kafka+transactional+messages%3A+read_committed http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite_name=read+Kafka+transactional+messages%3A+read_committed I found the Kafka consumer may not see the committed messages for a short time. This PR just adds a new method `waitUntilOffsetAppears` and uses it to make sure the consumer can see a specified offset before checking the result. ## How was this patch tested? Jenkins Closes #22293 from zsxwing/SPARK-25288. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa70a0a1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa70a0a1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa70a0a1 Branch: refs/heads/master Commit: aa70a0a1a434e8a4b1d4dde00e20b865bb70b8dd Parents: f29c2b5 Author: Shixiong Zhu Authored: Thu Aug 30 23:23:11 2018 -0700 Committer: Shixiong Zhu Committed: Thu Aug 30 23:23:11 2018 -0700 -- .../kafka010/KafkaMicroBatchSourceSuite.scala | 34 .../spark/sql/kafka010/KafkaRelationSuite.scala | 7 .../spark/sql/kafka010/KafkaTestUtils.scala | 10 ++ 3 files changed, 37 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aa70a0a1/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index eb66cca..78249f7 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -160,14 +160,18 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf } object WithOffsetSync { -def apply(topic: String)(func: () => Unit): StreamAction = { +/** + * Run `func` to write some Kafka messages and wait until the latest offset of the given + * `TopicPartition` is not less than `expectedOffset`. + */ +def apply( +topicPartition: TopicPartition, +expectedOffset: Long)(func: () => Unit): StreamAction = { Execute("Run Kafka Producer")(_ => { func() // This is a hack for the race condition that the committed message may be not visible to // consumer for a short time. -// Looks like after the following call returns, the consumer can always read the committed -// messages. -testUtils.getLatestOffsets(Set(topic)) +testUtils.waitUntilOffsetAppears(topicPartition, expectedOffset) }) } } @@ -652,13 +656,14 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } } +val topicPartition = new TopicPartition(topic, 0) // The message values are the same as their offsets to make the test easy to follow testUtils.withTranscationalProducer { producer => testStream(mapped)( StartStream(ProcessingTime(100), clock), waitUntilBatchProcessed, CheckAnswer(), -WithOffsetSync(topic) { () => +WithOffsetSync(topicPartition, expectedOffset = 5) { () => // Send 5 messages. They should be visible only after being committed. producer.beginTransaction() (0 to 4).foreach { i => @@ -669,7 +674,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { waitUntilBatchProcessed, // Should not see any uncommitted messages CheckNewAnswer(), -WithOffsetSync(topic) { () => +WithOffsetSync(topicPartition, expectedOffset = 6) { () => producer.commitTransaction() }, AdvanceManualClock(100), @@ -678,7 +683,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { AdvanceManualClock(100), waitUntilBatchProcessed, CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a