spark git commit: [SPARK-22682][SQL] HashExpression does not need to create global variables
Repository: spark Updated Branches: refs/heads/master 295df746e -> a8af4da12 [SPARK-22682][SQL] HashExpression does not need to create global variables ## What changes were proposed in this pull request? It turns out that `HashExpression` can pass around some values via parameter when splitting codes into methods, to save some global variable slots. This can also prevent a weird case that global variable appears in parameter list, which is discovered by https://github.com/apache/spark/pull/19865 ## How was this patch tested? existing tests Author: Wenchen FanCloses #19878 from cloud-fan/minor. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8af4da1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8af4da1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8af4da1 Branch: refs/heads/master Commit: a8af4da12ce43cd5528a53b5f7f454e9dbe71d6e Parents: 295df74 Author: Wenchen Fan Authored: Tue Dec 5 12:43:05 2017 +0800 Committer: Wenchen Fan Committed: Tue Dec 5 12:43:05 2017 +0800 -- .../spark/sql/catalyst/expressions/hash.scala | 118 +-- .../expressions/HashExpressionsSuite.scala | 34 -- 2 files changed, 106 insertions(+), 46 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a8af4da1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala index c3289b8..d0ed2ab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala @@ -270,17 +270,36 @@ abstract class HashExpression[E] extends Expression { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { ev.isNull = "false" -val childrenHash = ctx.splitExpressions(children.map { child => + +val childrenHash = children.map { child => val childGen = child.genCode(ctx) childGen.code + ctx.nullSafeExec(child.nullable, childGen.isNull) { computeHash(childGen.value, child.dataType, ev.value, ctx) } -}) +} + +val hashResultType = ctx.javaType(dataType) +val codes = if (ctx.INPUT_ROW == null || ctx.currentVars != null) { + childrenHash.mkString("\n") +} else { + ctx.splitExpressions( +expressions = childrenHash, +funcName = "computeHash", +arguments = Seq("InternalRow" -> ctx.INPUT_ROW, hashResultType -> ev.value), +returnType = hashResultType, +makeSplitFunction = body => + s""" + |$body + |return ${ev.value}; + """.stripMargin, +foldFunctions = _.map(funcCall => s"${ev.value} = $funcCall;").mkString("\n")) +} -ctx.addMutableState(ctx.javaType(dataType), ev.value) -ev.copy(code = s""" - ${ev.value} = $seed; - $childrenHash""") +ev.copy(code = + s""" + |$hashResultType ${ev.value} = $seed; + |$codes + """.stripMargin) } protected def nullSafeElementHash( @@ -389,13 +408,21 @@ abstract class HashExpression[E] extends Expression { input: String, result: String, fields: Array[StructField]): String = { -val hashes = fields.zipWithIndex.map { case (field, index) => +val fieldsHash = fields.zipWithIndex.map { case (field, index) => nullSafeElementHash(input, index.toString, field.nullable, field.dataType, result, ctx) } +val hashResultType = ctx.javaType(dataType) ctx.splitExpressions( - expressions = hashes, - funcName = "getHash", - arguments = Seq("InternalRow" -> input)) + expressions = fieldsHash, + funcName = "computeHashForStruct", + arguments = Seq("InternalRow" -> input, hashResultType -> result), + returnType = hashResultType, + makeSplitFunction = body => +s""" + |$body + |return $result; + """.stripMargin, + foldFunctions = _.map(funcCall => s"$result = $funcCall;").mkString("\n")) } @tailrec @@ -610,25 +637,44 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { ev.isNull = "false" + val childHash = ctx.freshName("childHash") -val childrenHash = ctx.splitExpressions(children.map { child => +val childrenHash = children.map { child => val childGen = child.genCode(ctx) val codeToComputeHash =
spark git commit: [SPARK-22677][SQL] cleanup whole stage codegen for hash aggregate
Repository: spark Updated Branches: refs/heads/master 3887b7eef -> 295df746e [SPARK-22677][SQL] cleanup whole stage codegen for hash aggregate ## What changes were proposed in this pull request? The `HashAggregateExec` whole stage codegen path is a little messy and hard to understand, this code cleans it up a little bit, especially for the fast hash map part. ## How was this patch tested? existing tests Author: Wenchen FanCloses #19869 from cloud-fan/hash-agg. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/295df746 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/295df746 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/295df746 Branch: refs/heads/master Commit: 295df746ecb1def5530a044d6670b28821da89f0 Parents: 3887b7e Author: Wenchen Fan Authored: Tue Dec 5 12:38:26 2017 +0800 Committer: Wenchen Fan Committed: Tue Dec 5 12:38:26 2017 +0800 -- .../execution/aggregate/HashAggregateExec.scala | 402 +-- 1 file changed, 195 insertions(+), 207 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/295df746/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 9139788..26d8cd7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.execution.vectorized.MutableColumnarRow +import org.apache.spark.sql.execution.vectorized.{ColumnarRow, MutableColumnarRow} import org.apache.spark.sql.types.{DecimalType, StringType, StructType} import org.apache.spark.unsafe.KVIterator import org.apache.spark.util.Utils @@ -444,6 +444,7 @@ case class HashAggregateExec( val funcName = ctx.freshName("doAggregateWithKeysOutput") val keyTerm = ctx.freshName("keyTerm") val bufferTerm = ctx.freshName("bufferTerm") +val numOutput = metricTerm(ctx, "numOutputRows") val body = if (modes.contains(Final) || modes.contains(Complete)) { @@ -520,6 +521,7 @@ case class HashAggregateExec( s""" private void $funcName(UnsafeRow $keyTerm, UnsafeRow $bufferTerm) throws java.io.IOException { + $numOutput.add(1); $body } """) @@ -549,7 +551,7 @@ case class HashAggregateExec( isSupported && isNotByteArrayDecimalType } - private def enableTwoLevelHashMap(ctx: CodegenContext) = { + private def enableTwoLevelHashMap(ctx: CodegenContext): Unit = { if (!checkIfFastHashMapSupported(ctx)) { if (modes.forall(mode => mode == Partial || mode == PartialMerge) && !Utils.isTesting) { logInfo("spark.sql.codegen.aggregate.map.twolevel.enabled is set to true, but" @@ -560,9 +562,8 @@ case class HashAggregateExec( // This is for testing/benchmarking only. // We enforce to first level to be a vectorized hashmap, instead of the default row-based one. - sqlContext.getConf("spark.sql.codegen.aggregate.map.vectorized.enable", null) match { -case "true" => isVectorizedHashMapEnabled = true -case null | "" | "false" => None } + isVectorizedHashMapEnabled = sqlContext.getConf( +"spark.sql.codegen.aggregate.map.vectorized.enable", "false") == "true" } } @@ -573,94 +574,84 @@ case class HashAggregateExec( enableTwoLevelHashMap(ctx) } else { sqlContext.getConf("spark.sql.codegen.aggregate.map.vectorized.enable", null) match { -case "true" => logWarning("Two level hashmap is disabled but vectorized hashmap is " + - "enabled.") -case null | "" | "false" => None +case "true" => + logWarning("Two level hashmap is disabled but vectorized hashmap is enabled.") +case _ => } } -fastHashMapTerm = ctx.freshName("fastHashMap") -val fastHashMapClassName = ctx.freshName("FastHashMap") -val fastHashMapGenerator = - if (isVectorizedHashMapEnabled) { -new VectorizedHashMapGenerator(ctx, aggregateExpressions, - fastHashMapClassName, groupingKeySchema, bufferSchema) - } else { -new RowBasedHashMapGenerator(ctx,
spark git commit: [SPARK-22665][SQL] Avoid repartitioning with empty list of expressions
Repository: spark Updated Branches: refs/heads/master 1d5597b40 -> 3887b7eef [SPARK-22665][SQL] Avoid repartitioning with empty list of expressions ## What changes were proposed in this pull request? Repartitioning by empty set of expressions is currently possible, even though it is a case which is not handled properly. Indeed, in `HashExpression` there is a check to avoid to run it on an empty set, but this check is not performed while repartitioning. Thus, the PR adds a check to avoid this wrong situation. ## How was this patch tested? added UT Author: Marco GaidoCloses #19870 from mgaido91/SPARK-22665. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3887b7ee Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3887b7ee Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3887b7ee Branch: refs/heads/master Commit: 3887b7eef7b89d3aeecadebc0fdafa47586a232b Parents: 1d5597b Author: Marco Gaido Authored: Mon Dec 4 17:08:56 2017 -0800 Committer: gatorsmile Committed: Mon Dec 4 17:08:56 2017 -0800 -- .../catalyst/plans/logical/basicLogicalOperators.scala | 12 +++- .../spark/sql/catalyst/analysis/AnalysisSuite.scala | 5 - 2 files changed, 11 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3887b7ee/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 93de7c1..ba5f97d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -22,8 +22,8 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.statsEstimation._ -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning} +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, + RangePartitioning, RoundRobinPartitioning} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils import org.apache.spark.util.random.RandomSampler @@ -847,14 +847,16 @@ case class RepartitionByExpression( "`SortOrder`, which means `RangePartitioning`, or none of them are `SortOrder`, which " + "means `HashPartitioning`. In this case we have:" + s""" - |SortOrder: ${sortOrder} - |NonSortOrder: ${nonSortOrder} + |SortOrder: $sortOrder + |NonSortOrder: $nonSortOrder """.stripMargin) if (sortOrder.nonEmpty) { RangePartitioning(sortOrder.map(_.asInstanceOf[SortOrder]), numPartitions) -} else { +} else if (nonSortOrder.nonEmpty) { HashPartitioning(nonSortOrder, numPartitions) +} else { + RoundRobinPartitioning(numPartitions) } } http://git-wip-us.apache.org/repos/asf/spark/blob/3887b7ee/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 0e2e706..109fb32 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -27,7 +27,8 @@ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{Cross, Inner} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning} +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, + RangePartitioning, RoundRobinPartitioning} import org.apache.spark.sql.types._ @@ -530,6 +531,8 @@ class AnalysisSuite extends AnalysisTest with Matchers { checkPartitioning[RangePartitioning](numPartitions = 10, exprs = SortOrder('a.attr, Ascending), SortOrder('b.attr, Descending)) +
spark git commit: [SPARK-22626][SQL][FOLLOWUP] improve documentation and simplify test case
Repository: spark Updated Branches: refs/heads/master e1dd03e42 -> 1d5597b40 [SPARK-22626][SQL][FOLLOWUP] improve documentation and simplify test case ## What changes were proposed in this pull request? This PR improves documentation for not using zero `numRows` statistics and simplifies the test case. The reason why some Hive tables have zero `numRows` is that, in Hive, when stats gathering is disabled, `numRows` is always zero after INSERT command: ``` hive> create table src (key int, value string) stored as orc; hive> desc formatted src; Table Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} numFiles0 numRows 0 rawDataSize 0 totalSize 0 transient_lastDdlTime 1512399590 hive> set hive.stats.autogather=false; hive> insert into src select 1, 'a'; hive> desc formatted src; Table Parameters: numFiles1 numRows 0 rawDataSize 0 totalSize 275 transient_lastDdlTime 1512399647 hive> insert into src select 1, 'b'; hive> desc formatted src; Table Parameters: numFiles2 numRows 0 rawDataSize 0 totalSize 550 transient_lastDdlTime 1512399687 ``` ## How was this patch tested? Modified existing test. Author: Zhenhua WangCloses #19880 from wzhfy/doc_zero_rowCount. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d5597b4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d5597b4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d5597b4 Branch: refs/heads/master Commit: 1d5597b408485e41812f3645a670864ad88570a0 Parents: e1dd03e Author: Zhenhua Wang Authored: Mon Dec 4 15:08:07 2017 -0800 Committer: gatorsmile Committed: Mon Dec 4 15:08:07 2017 -0800 -- .../apache/spark/sql/hive/client/HiveClientImpl.scala| 8 +--- .../org/apache/spark/sql/hive/StatisticsSuite.scala | 11 +-- 2 files changed, 10 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1d5597b4/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 77e8360..08eb5c7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -426,9 +426,11 @@ private[hive] class HiveClientImpl( // TODO: stats should include all the other two fields (`numFiles` and `numPartitions`). // (see StatsSetupConst in Hive) val stats = -// When table is external, `totalSize` is always zero, which will influence join strategy -// so when `totalSize` is zero, use `rawDataSize` instead. When `rawDataSize` is also zero, -// return None. Later, we will use the other ways to estimate the statistics. +// When table is external, `totalSize` is always zero, which will influence join strategy. +// So when `totalSize` is zero, use `rawDataSize` instead. When `rawDataSize` is also zero, +// return None. +// In Hive, when statistics gathering is disabled, `rawDataSize` and `numRows` is always +// zero after INSERT command. So they are used here only if they are larger than zero. if (totalSize.isDefined && totalSize.get > 0L) { Some(CatalogStatistics(sizeInBytes = totalSize.get, rowCount = rowCount.filter(_ > 0))) } else if (rawDataSize.isDefined && rawDataSize.get > 0) { http://git-wip-us.apache.org/repos/asf/spark/blob/1d5597b4/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index ee027e5..13f06a2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1366,17 +1366,16 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql("CREATE TABLE maybe_big (c1 bigint)" + "TBLPROPERTIES ('numRows'='0', 'rawDataSize'='600', 'totalSize'='8')") - val relation =
spark git commit: [SPARK-22372][CORE, YARN] Make cluster submission use SparkApplication.
Repository: spark Updated Branches: refs/heads/master f81401e1c -> e1dd03e42 [SPARK-22372][CORE, YARN] Make cluster submission use SparkApplication. The main goal of this change is to allow multiple cluster-mode submissions from the same JVM, without having them end up with mixed configuration. That is done by extending the SparkApplication trait, and doing so was reasonably trivial for standalone and mesos modes. For YARN mode, there was a complication. YARN used a "SPARK_YARN_MODE" system property to control behavior indirectly in a whole bunch of places, mainly in the SparkHadoopUtil / YarnSparkHadoopUtil classes. Most of the changes here are removing that. Since we removed support for Hadoop 1.x, some methods that lived in YarnSparkHadoopUtil can now live in SparkHadoopUtil. The remaining methods don't need to be part of the class, and can be called directly from the YarnSparkHadoopUtil object, so now there's a single implementation of SparkHadoopUtil. There were two places in the code that relied on SPARK_YARN_MODE to make decisions about YARN-specific functionality, and now explicitly check the master from the configuration for that instead: * fetching the external shuffle service port, which can come from the YARN configuration. * propagation of the authentication secret using Hadoop credentials. This also was cleaned up a little to not need so many methods in `SparkHadoopUtil`. With those out of the way, actually changing the YARN client to extend SparkApplication was easy. Tested with existing unit tests, and also by running YARN apps with auth and kerberos both on and off in a real cluster. Author: Marcelo VanzinCloses #19631 from vanzin/SPARK-22372. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1dd03e4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1dd03e4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1dd03e4 Branch: refs/heads/master Commit: e1dd03e42c2131b167b1e80c761291e88bfdf03f Parents: f81401e Author: Marcelo Vanzin Authored: Mon Dec 4 11:05:03 2017 -0800 Committer: Marcelo Vanzin Committed: Mon Dec 4 11:05:03 2017 -0800 -- .../org/apache/spark/SecurityManager.scala | 102 +++--- .../scala/org/apache/spark/SparkContext.scala | 3 - .../main/scala/org/apache/spark/SparkEnv.scala | 4 + .../scala/org/apache/spark/deploy/Client.scala | 8 +- .../apache/spark/deploy/SparkHadoopUtil.scala | 48 +-- .../org/apache/spark/deploy/SparkSubmit.scala | 31 ++--- .../deploy/rest/RestSubmissionClient.scala | 31 ++--- .../executor/CoarseGrainedExecutorBackend.scala | 10 +- .../scala/org/apache/spark/util/Utils.scala | 6 +- .../org/apache/spark/SecurityManagerSuite.scala | 31 - .../apache/spark/deploy/SparkSubmitSuite.scala | 8 +- .../deploy/rest/StandaloneRestSubmitSuite.scala | 2 +- project/MimaExcludes.scala | 6 + .../spark/deploy/yarn/ApplicationMaster.scala | 54 .../org/apache/spark/deploy/yarn/Client.scala | 65 + .../apache/spark/deploy/yarn/YarnRMClient.scala | 2 +- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 135 ++- .../yarn/security/AMCredentialRenewer.scala | 2 +- .../cluster/YarnClientSchedulerBackend.scala| 4 +- .../cluster/YarnClusterSchedulerBackend.scala | 2 +- .../deploy/yarn/BaseYarnClusterSuite.scala | 5 - .../apache/spark/deploy/yarn/ClientSuite.scala | 31 + .../spark/deploy/yarn/YarnClusterSuite.scala| 20 ++- .../deploy/yarn/YarnSparkHadoopUtilSuite.scala | 49 +-- .../YARNHadoopDelegationTokenManagerSuite.scala | 11 +- 25 files changed, 274 insertions(+), 396 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/core/src/main/scala/org/apache/spark/SecurityManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 2480e56..4c1dbe3 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -19,6 +19,7 @@ package org.apache.spark import java.lang.{Byte => JByte} import java.net.{Authenticator, PasswordAuthentication} +import java.nio.charset.StandardCharsets.UTF_8 import java.security.{KeyStore, SecureRandom} import java.security.cert.X509Certificate import javax.net.ssl._ @@ -26,10 +27,11 @@ import javax.net.ssl._ import com.google.common.hash.HashCodes import com.google.common.io.Files import org.apache.hadoop.io.Text +import org.apache.hadoop.security.{Credentials, UserGroupInformation} -import
spark git commit: [SPARK-22162] Executors and the driver should use consistent JobIDs in the RDD commit protocol
Repository: spark Updated Branches: refs/heads/master 3927bb9b4 -> f81401e1c [SPARK-22162] Executors and the driver should use consistent JobIDs in the RDD commit protocol I have modified SparkHadoopWriter so that executors and the driver always use consistent JobIds during the hadoop commit. Before SPARK-18191, spark always used the rddId, it just incorrectly named the variable stageId. After SPARK-18191, it used the rddId as the jobId on the driver's side, and the stageId as the jobId on the executors' side. With this change executors and the driver will consistently uses rddId as the jobId. Also with this change, during the hadoop commit protocol spark uses actual stageId to check whether a stage can be committed unlike before that it was using executors' jobId to do this check. In addition to the existing unit tests, a test has been added to check whether executors and the driver are using the same JobId. The test failed before this change and passed after applying this fix. Author: Reza SafiCloses #19848 from rezasafi/stagerddsimple. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f81401e1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f81401e1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f81401e1 Branch: refs/heads/master Commit: f81401e1cb39f2d6049b79dc8d61305f3371276f Parents: 3927bb9 Author: Reza Safi Authored: Mon Dec 4 09:23:48 2017 -0800 Committer: Marcelo Vanzin Committed: Mon Dec 4 09:23:48 2017 -0800 -- .../spark/internal/io/SparkHadoopWriter.scala | 12 +++--- .../spark/mapred/SparkHadoopMapRedUtil.scala| 5 ++- .../spark/rdd/PairRDDFunctionsSuite.scala | 44 3 files changed, 53 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f81401e1/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index 949d8c6..abf3921 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -60,17 +60,17 @@ object SparkHadoopWriter extends Logging { config: HadoopWriteConfigUtil[K, V]): Unit = { // Extract context and configuration from RDD. val sparkContext = rdd.context -val stageId = rdd.id +val commitJobId = rdd.id // Set up a job. val jobTrackerId = createJobTrackerID(new Date()) -val jobContext = config.createJobContext(jobTrackerId, stageId) +val jobContext = config.createJobContext(jobTrackerId, commitJobId) config.initOutputFormat(jobContext) // Assert the output format/key/value class is set in JobConf. config.assertConf(jobContext, rdd.conf) -val committer = config.createCommitter(stageId) +val committer = config.createCommitter(commitJobId) committer.setupJob(jobContext) // Try to write all RDD partitions as a Hadoop OutputFormat. @@ -80,7 +80,7 @@ object SparkHadoopWriter extends Logging { context = context, config = config, jobTrackerId = jobTrackerId, - sparkStageId = context.stageId, + commitJobId = commitJobId, sparkPartitionId = context.partitionId, sparkAttemptNumber = context.attemptNumber, committer = committer, @@ -102,14 +102,14 @@ object SparkHadoopWriter extends Logging { context: TaskContext, config: HadoopWriteConfigUtil[K, V], jobTrackerId: String, - sparkStageId: Int, + commitJobId: Int, sparkPartitionId: Int, sparkAttemptNumber: Int, committer: FileCommitProtocol, iterator: Iterator[(K, V)]): TaskCommitMessage = { // Set up a task. val taskContext = config.createTaskAttemptContext( - jobTrackerId, sparkStageId, sparkPartitionId, sparkAttemptNumber) + jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber) committer.setupTask(taskContext) val (outputMetrics, callback) = initHadoopOutputMetrics(context) http://git-wip-us.apache.org/repos/asf/spark/blob/f81401e1/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala -- diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index 607283a..764735d 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++
spark git commit: [SPARK-22473][FOLLOWUP][TEST] Remove deprecated Date functions
Repository: spark Updated Branches: refs/heads/master 4131ad03f -> 3927bb9b4 [SPARK-22473][FOLLOWUP][TEST] Remove deprecated Date functions ## What changes were proposed in this pull request? #19696 replaced the deprecated usages for `Date` and `Waiter`, but a few methods were missed. The PR fixes the forgotten deprecated usages. ## How was this patch tested? existing UTs Author: Marco GaidoCloses #19875 from mgaido91/SPARK-22473_FOLLOWUP. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3927bb9b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3927bb9b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3927bb9b Branch: refs/heads/master Commit: 3927bb9b460d2d944ecf3c8552d71e8a25d29655 Parents: 4131ad0 Author: Marco Gaido Authored: Mon Dec 4 11:07:27 2017 -0600 Committer: Sean Owen Committed: Mon Dec 4 11:07:27 2017 -0600 -- .../spark/sql/catalyst/expressions/PredicateSuite.scala | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3927bb9b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala index 865092a..0079e4e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala @@ -291,26 +291,26 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper { private val udt = new ExamplePointUDT private val smallValues = -Seq(1.toByte, 1.toShort, 1, 1L, Decimal(1), Array(1.toByte), new Date(2000, 1, 1), +Seq(1.toByte, 1.toShort, 1, 1L, Decimal(1), Array(1.toByte), Date.valueOf("2000-01-01"), new Timestamp(1), "a", 1f, 1d, 0f, 0d, false, Array(1L, 2L)) .map(Literal(_)) ++ Seq(Literal.create(MyStruct(1L, "b")), Literal.create(MyStruct2(MyStruct(1L, "a"), Array(1, 1))), Literal.create(ArrayData.toArrayData(Array(1.0, 2.0)), udt)) private val largeValues = -Seq(2.toByte, 2.toShort, 2, 2L, Decimal(2), Array(2.toByte), new Date(2000, 1, 2), +Seq(2.toByte, 2.toShort, 2, 2L, Decimal(2), Array(2.toByte), Date.valueOf("2000-01-02"), new Timestamp(2), "b", 2f, 2d, Float.NaN, Double.NaN, true, Array(2L, 1L)) .map(Literal(_)) ++ Seq(Literal.create(MyStruct(2L, "b")), Literal.create(MyStruct2(MyStruct(1L, "a"), Array(1, 2))), Literal.create(ArrayData.toArrayData(Array(1.0, 3.0)), udt)) private val equalValues1 = -Seq(1.toByte, 1.toShort, 1, 1L, Decimal(1), Array(1.toByte), new Date(2000, 1, 1), +Seq(1.toByte, 1.toShort, 1, 1L, Decimal(1), Array(1.toByte), Date.valueOf("2000-01-01"), new Timestamp(1), "a", 1f, 1d, Float.NaN, Double.NaN, true, Array(1L, 2L)) .map(Literal(_)) ++ Seq(Literal.create(MyStruct(1L, "b")), Literal.create(MyStruct2(MyStruct(1L, "a"), Array(1, 1))), Literal.create(ArrayData.toArrayData(Array(1.0, 2.0)), udt)) private val equalValues2 = -Seq(1.toByte, 1.toShort, 1, 1L, Decimal(1), Array(1.toByte), new Date(2000, 1, 1), +Seq(1.toByte, 1.toShort, 1, 1L, Decimal(1), Array(1.toByte), Date.valueOf("2000-01-01"), new Timestamp(1), "a", 1f, 1d, Float.NaN, Double.NaN, true, Array(1L, 2L)) .map(Literal(_)) ++ Seq(Literal.create(MyStruct(1L, "b")), Literal.create(MyStruct2(MyStruct(1L, "a"), Array(1, 1))), - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org