spark git commit: [SPARK-22682][SQL] HashExpression does not need to create global variables

2017-12-04 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 295df746e -> a8af4da12


[SPARK-22682][SQL] HashExpression does not need to create global variables

## What changes were proposed in this pull request?

It turns out that `HashExpression` can pass around some values via parameter 
when splitting codes into methods, to save some global variable slots.

This can also prevent a weird case that global variable appears in parameter 
list, which is discovered by https://github.com/apache/spark/pull/19865

## How was this patch tested?

existing tests

Author: Wenchen Fan 

Closes #19878 from cloud-fan/minor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8af4da1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8af4da1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8af4da1

Branch: refs/heads/master
Commit: a8af4da12ce43cd5528a53b5f7f454e9dbe71d6e
Parents: 295df74
Author: Wenchen Fan 
Authored: Tue Dec 5 12:43:05 2017 +0800
Committer: Wenchen Fan 
Committed: Tue Dec 5 12:43:05 2017 +0800

--
 .../spark/sql/catalyst/expressions/hash.scala   | 118 +--
 .../expressions/HashExpressionsSuite.scala  |  34 --
 2 files changed, 106 insertions(+), 46 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a8af4da1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
index c3289b8..d0ed2ab 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
@@ -270,17 +270,36 @@ abstract class HashExpression[E] extends Expression {
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
 ev.isNull = "false"
-val childrenHash = ctx.splitExpressions(children.map { child =>
+
+val childrenHash = children.map { child =>
   val childGen = child.genCode(ctx)
   childGen.code + ctx.nullSafeExec(child.nullable, childGen.isNull) {
 computeHash(childGen.value, child.dataType, ev.value, ctx)
   }
-})
+}
+
+val hashResultType = ctx.javaType(dataType)
+val codes = if (ctx.INPUT_ROW == null || ctx.currentVars != null) {
+  childrenHash.mkString("\n")
+} else {
+  ctx.splitExpressions(
+expressions = childrenHash,
+funcName = "computeHash",
+arguments = Seq("InternalRow" -> ctx.INPUT_ROW, hashResultType -> 
ev.value),
+returnType = hashResultType,
+makeSplitFunction = body =>
+  s"""
+ |$body
+ |return ${ev.value};
+   """.stripMargin,
+foldFunctions = _.map(funcCall => s"${ev.value} = 
$funcCall;").mkString("\n"))
+}
 
-ctx.addMutableState(ctx.javaType(dataType), ev.value)
-ev.copy(code = s"""
-  ${ev.value} = $seed;
-  $childrenHash""")
+ev.copy(code =
+  s"""
+ |$hashResultType ${ev.value} = $seed;
+ |$codes
+   """.stripMargin)
   }
 
   protected def nullSafeElementHash(
@@ -389,13 +408,21 @@ abstract class HashExpression[E] extends Expression {
   input: String,
   result: String,
   fields: Array[StructField]): String = {
-val hashes = fields.zipWithIndex.map { case (field, index) =>
+val fieldsHash = fields.zipWithIndex.map { case (field, index) =>
   nullSafeElementHash(input, index.toString, field.nullable, 
field.dataType, result, ctx)
 }
+val hashResultType = ctx.javaType(dataType)
 ctx.splitExpressions(
-  expressions = hashes,
-  funcName = "getHash",
-  arguments = Seq("InternalRow" -> input))
+  expressions = fieldsHash,
+  funcName = "computeHashForStruct",
+  arguments = Seq("InternalRow" -> input, hashResultType -> result),
+  returnType = hashResultType,
+  makeSplitFunction = body =>
+s"""
+   |$body
+   |return $result;
+ """.stripMargin,
+  foldFunctions = _.map(funcCall => s"$result = 
$funcCall;").mkString("\n"))
   }
 
   @tailrec
@@ -610,25 +637,44 @@ case class HiveHash(children: Seq[Expression]) extends 
HashExpression[Int] {
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
 ev.isNull = "false"
+
 val childHash = ctx.freshName("childHash")
-val childrenHash = ctx.splitExpressions(children.map { child =>
+val childrenHash = children.map { child =>
   val childGen = child.genCode(ctx)
   val codeToComputeHash = 

spark git commit: [SPARK-22677][SQL] cleanup whole stage codegen for hash aggregate

2017-12-04 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 3887b7eef -> 295df746e


[SPARK-22677][SQL] cleanup whole stage codegen for hash aggregate

## What changes were proposed in this pull request?

The `HashAggregateExec` whole stage codegen path is a little messy and hard to 
understand, this code cleans it up a little bit, especially for the fast hash 
map part.

## How was this patch tested?

existing tests

Author: Wenchen Fan 

Closes #19869 from cloud-fan/hash-agg.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/295df746
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/295df746
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/295df746

Branch: refs/heads/master
Commit: 295df746ecb1def5530a044d6670b28821da89f0
Parents: 3887b7e
Author: Wenchen Fan 
Authored: Tue Dec 5 12:38:26 2017 +0800
Committer: Wenchen Fan 
Committed: Tue Dec 5 12:38:26 2017 +0800

--
 .../execution/aggregate/HashAggregateExec.scala | 402 +--
 1 file changed, 195 insertions(+), 207 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/295df746/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 9139788..26d8cd7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
-import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
+import org.apache.spark.sql.execution.vectorized.{ColumnarRow, 
MutableColumnarRow}
 import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
 import org.apache.spark.unsafe.KVIterator
 import org.apache.spark.util.Utils
@@ -444,6 +444,7 @@ case class HashAggregateExec(
 val funcName = ctx.freshName("doAggregateWithKeysOutput")
 val keyTerm = ctx.freshName("keyTerm")
 val bufferTerm = ctx.freshName("bufferTerm")
+val numOutput = metricTerm(ctx, "numOutputRows")
 
 val body =
 if (modes.contains(Final) || modes.contains(Complete)) {
@@ -520,6 +521,7 @@ case class HashAggregateExec(
   s"""
 private void $funcName(UnsafeRow $keyTerm, UnsafeRow $bufferTerm)
 throws java.io.IOException {
+  $numOutput.add(1);
   $body
 }
""")
@@ -549,7 +551,7 @@ case class HashAggregateExec(
 isSupported  && isNotByteArrayDecimalType
   }
 
-  private def enableTwoLevelHashMap(ctx: CodegenContext) = {
+  private def enableTwoLevelHashMap(ctx: CodegenContext): Unit = {
 if (!checkIfFastHashMapSupported(ctx)) {
   if (modes.forall(mode => mode == Partial || mode == PartialMerge) && 
!Utils.isTesting) {
 logInfo("spark.sql.codegen.aggregate.map.twolevel.enabled is set to 
true, but"
@@ -560,9 +562,8 @@ case class HashAggregateExec(
 
   // This is for testing/benchmarking only.
   // We enforce to first level to be a vectorized hashmap, instead of the 
default row-based one.
-  sqlContext.getConf("spark.sql.codegen.aggregate.map.vectorized.enable", 
null) match {
-case "true" => isVectorizedHashMapEnabled = true
-case null | "" | "false" => None  }
+  isVectorizedHashMapEnabled = sqlContext.getConf(
+"spark.sql.codegen.aggregate.map.vectorized.enable", "false") == "true"
 }
   }
 
@@ -573,94 +574,84 @@ case class HashAggregateExec(
   enableTwoLevelHashMap(ctx)
 } else {
   sqlContext.getConf("spark.sql.codegen.aggregate.map.vectorized.enable", 
null) match {
-case "true" => logWarning("Two level hashmap is disabled but 
vectorized hashmap is " +
-  "enabled.")
-case null | "" | "false" => None
+case "true" =>
+  logWarning("Two level hashmap is disabled but vectorized hashmap is 
enabled.")
+case _ =>
   }
 }
-fastHashMapTerm = ctx.freshName("fastHashMap")
-val fastHashMapClassName = ctx.freshName("FastHashMap")
-val fastHashMapGenerator =
-  if (isVectorizedHashMapEnabled) {
-new VectorizedHashMapGenerator(ctx, aggregateExpressions,
-  fastHashMapClassName, groupingKeySchema, bufferSchema)
-  } else {
-new RowBasedHashMapGenerator(ctx, 

spark git commit: [SPARK-22665][SQL] Avoid repartitioning with empty list of expressions

2017-12-04 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 1d5597b40 -> 3887b7eef


[SPARK-22665][SQL] Avoid repartitioning with empty list of expressions

## What changes were proposed in this pull request?

Repartitioning by empty set of expressions is currently possible, even though 
it is a case which is not handled properly. Indeed, in `HashExpression` there 
is a check to avoid to run it on an empty set, but this check is not performed 
while repartitioning.
Thus, the PR adds a check to avoid this wrong situation.

## How was this patch tested?

added UT

Author: Marco Gaido 

Closes #19870 from mgaido91/SPARK-22665.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3887b7ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3887b7ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3887b7ee

Branch: refs/heads/master
Commit: 3887b7eef7b89d3aeecadebc0fdafa47586a232b
Parents: 1d5597b
Author: Marco Gaido 
Authored: Mon Dec 4 17:08:56 2017 -0800
Committer: gatorsmile 
Committed: Mon Dec 4 17:08:56 2017 -0800

--
 .../catalyst/plans/logical/basicLogicalOperators.scala  | 12 +++-
 .../spark/sql/catalyst/analysis/AnalysisSuite.scala |  5 -
 2 files changed, 11 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3887b7ee/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 93de7c1..ba5f97d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -22,8 +22,8 @@ import 
org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.statsEstimation._
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, RangePartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning,
+  RangePartitioning, RoundRobinPartitioning}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 import org.apache.spark.util.random.RandomSampler
@@ -847,14 +847,16 @@ case class RepartitionByExpression(
 "`SortOrder`, which means `RangePartitioning`, or none of them are 
`SortOrder`, which " +
 "means `HashPartitioning`. In this case we have:" +
   s"""
- |SortOrder: ${sortOrder}
- |NonSortOrder: ${nonSortOrder}
+ |SortOrder: $sortOrder
+ |NonSortOrder: $nonSortOrder
""".stripMargin)
 
 if (sortOrder.nonEmpty) {
   RangePartitioning(sortOrder.map(_.asInstanceOf[SortOrder]), 
numPartitions)
-} else {
+} else if (nonSortOrder.nonEmpty) {
   HashPartitioning(nonSortOrder, numPartitions)
+} else {
+  RoundRobinPartitioning(numPartitions)
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3887b7ee/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 0e2e706..109fb32 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -27,7 +27,8 @@ import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{Cross, Inner}
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, RangePartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning,
+  RangePartitioning, RoundRobinPartitioning}
 import org.apache.spark.sql.types._
 
 
@@ -530,6 +531,8 @@ class AnalysisSuite extends AnalysisTest with Matchers {
 checkPartitioning[RangePartitioning](numPartitions = 10,
   exprs = SortOrder('a.attr, Ascending), SortOrder('b.attr, Descending))
 
+

spark git commit: [SPARK-22626][SQL][FOLLOWUP] improve documentation and simplify test case

2017-12-04 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master e1dd03e42 -> 1d5597b40


[SPARK-22626][SQL][FOLLOWUP] improve documentation and simplify test case

## What changes were proposed in this pull request?

This PR improves documentation for not using zero `numRows` statistics and 
simplifies the test case.

The reason why some Hive tables have zero `numRows` is that, in Hive, when 
stats gathering is disabled, `numRows` is always zero after INSERT command:
```
hive> create table src (key int, value string) stored as orc;
hive> desc formatted src;
Table Parameters:
COLUMN_STATS_ACCURATE   {\"BASIC_STATS\":\"true\"}
numFiles0
numRows 0
rawDataSize 0
totalSize   0
transient_lastDdlTime   1512399590

hive> set hive.stats.autogather=false;
hive> insert into src select 1, 'a';
hive> desc formatted src;
Table Parameters:
numFiles1
numRows 0
rawDataSize 0
totalSize   275
transient_lastDdlTime   1512399647

hive> insert into src select 1, 'b';
hive> desc formatted src;
Table Parameters:
numFiles2
numRows 0
rawDataSize 0
totalSize   550
transient_lastDdlTime   1512399687
```

## How was this patch tested?

Modified existing test.

Author: Zhenhua Wang 

Closes #19880 from wzhfy/doc_zero_rowCount.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d5597b4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d5597b4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d5597b4

Branch: refs/heads/master
Commit: 1d5597b408485e41812f3645a670864ad88570a0
Parents: e1dd03e
Author: Zhenhua Wang 
Authored: Mon Dec 4 15:08:07 2017 -0800
Committer: gatorsmile 
Committed: Mon Dec 4 15:08:07 2017 -0800

--
 .../apache/spark/sql/hive/client/HiveClientImpl.scala|  8 +---
 .../org/apache/spark/sql/hive/StatisticsSuite.scala  | 11 +--
 2 files changed, 10 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1d5597b4/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 77e8360..08eb5c7 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -426,9 +426,11 @@ private[hive] class HiveClientImpl(
   // TODO: stats should include all the other two fields (`numFiles` and 
`numPartitions`).
   // (see StatsSetupConst in Hive)
   val stats =
-// When table is external, `totalSize` is always zero, which will 
influence join strategy
-// so when `totalSize` is zero, use `rawDataSize` instead. When 
`rawDataSize` is also zero,
-// return None. Later, we will use the other ways to estimate the 
statistics.
+// When table is external, `totalSize` is always zero, which will 
influence join strategy.
+// So when `totalSize` is zero, use `rawDataSize` instead. When 
`rawDataSize` is also zero,
+// return None.
+// In Hive, when statistics gathering is disabled, `rawDataSize` and 
`numRows` is always
+// zero after INSERT command. So they are used here only if they are 
larger than zero.
 if (totalSize.isDefined && totalSize.get > 0L) {
   Some(CatalogStatistics(sizeInBytes = totalSize.get, rowCount = 
rowCount.filter(_ > 0)))
 } else if (rawDataSize.isDefined && rawDataSize.get > 0) {

http://git-wip-us.apache.org/repos/asf/spark/blob/1d5597b4/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index ee027e5..13f06a2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -1366,17 +1366,16 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
   sql("CREATE TABLE maybe_big (c1 bigint)" +
 "TBLPROPERTIES ('numRows'='0', 'rawDataSize'='600', 
'totalSize'='8')")
 
-  val relation = 

spark git commit: [SPARK-22372][CORE, YARN] Make cluster submission use SparkApplication.

2017-12-04 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master f81401e1c -> e1dd03e42


[SPARK-22372][CORE, YARN] Make cluster submission use SparkApplication.

The main goal of this change is to allow multiple cluster-mode
submissions from the same JVM, without having them end up with
mixed configuration. That is done by extending the SparkApplication
trait, and doing so was reasonably trivial for standalone and
mesos modes.

For YARN mode, there was a complication. YARN used a "SPARK_YARN_MODE"
system property to control behavior indirectly in a whole bunch of
places, mainly in the SparkHadoopUtil / YarnSparkHadoopUtil classes.
Most of the changes here are removing that.

Since we removed support for Hadoop 1.x, some methods that lived in
YarnSparkHadoopUtil can now live in SparkHadoopUtil. The remaining
methods don't need to be part of the class, and can be called directly
from the YarnSparkHadoopUtil object, so now there's a single
implementation of SparkHadoopUtil.

There were two places in the code that relied on  SPARK_YARN_MODE to
make decisions about YARN-specific functionality, and now explicitly check
the master from the configuration for that instead:

* fetching the external shuffle service port, which can come from the YARN
  configuration.

* propagation of the authentication secret using Hadoop credentials. This also
  was cleaned up a little to not need so many methods in `SparkHadoopUtil`.

With those out of the way, actually changing the YARN client
to extend SparkApplication was easy.

Tested with existing unit tests, and also by running YARN apps
with auth and kerberos both on and off in a real cluster.

Author: Marcelo Vanzin 

Closes #19631 from vanzin/SPARK-22372.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1dd03e4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1dd03e4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1dd03e4

Branch: refs/heads/master
Commit: e1dd03e42c2131b167b1e80c761291e88bfdf03f
Parents: f81401e
Author: Marcelo Vanzin 
Authored: Mon Dec 4 11:05:03 2017 -0800
Committer: Marcelo Vanzin 
Committed: Mon Dec 4 11:05:03 2017 -0800

--
 .../org/apache/spark/SecurityManager.scala  | 102 +++---
 .../scala/org/apache/spark/SparkContext.scala   |   3 -
 .../main/scala/org/apache/spark/SparkEnv.scala  |   4 +
 .../scala/org/apache/spark/deploy/Client.scala  |   8 +-
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  48 +--
 .../org/apache/spark/deploy/SparkSubmit.scala   |  31 ++---
 .../deploy/rest/RestSubmissionClient.scala  |  31 ++---
 .../executor/CoarseGrainedExecutorBackend.scala |  10 +-
 .../scala/org/apache/spark/util/Utils.scala |   6 +-
 .../org/apache/spark/SecurityManagerSuite.scala |  31 -
 .../apache/spark/deploy/SparkSubmitSuite.scala  |   8 +-
 .../deploy/rest/StandaloneRestSubmitSuite.scala |   2 +-
 project/MimaExcludes.scala  |   6 +
 .../spark/deploy/yarn/ApplicationMaster.scala   |  54 
 .../org/apache/spark/deploy/yarn/Client.scala   |  65 +
 .../apache/spark/deploy/yarn/YarnRMClient.scala |   2 +-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 135 ++-
 .../yarn/security/AMCredentialRenewer.scala |   2 +-
 .../cluster/YarnClientSchedulerBackend.scala|   4 +-
 .../cluster/YarnClusterSchedulerBackend.scala   |   2 +-
 .../deploy/yarn/BaseYarnClusterSuite.scala  |   5 -
 .../apache/spark/deploy/yarn/ClientSuite.scala  |  31 +
 .../spark/deploy/yarn/YarnClusterSuite.scala|  20 ++-
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala  |  49 +--
 .../YARNHadoopDelegationTokenManagerSuite.scala |  11 +-
 25 files changed, 274 insertions(+), 396 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/core/src/main/scala/org/apache/spark/SecurityManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala 
b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 2480e56..4c1dbe3 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark
 
 import java.lang.{Byte => JByte}
 import java.net.{Authenticator, PasswordAuthentication}
+import java.nio.charset.StandardCharsets.UTF_8
 import java.security.{KeyStore, SecureRandom}
 import java.security.cert.X509Certificate
 import javax.net.ssl._
@@ -26,10 +27,11 @@ import javax.net.ssl._
 import com.google.common.hash.HashCodes
 import com.google.common.io.Files
 import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
-import 

spark git commit: [SPARK-22162] Executors and the driver should use consistent JobIDs in the RDD commit protocol

2017-12-04 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 3927bb9b4 -> f81401e1c


[SPARK-22162] Executors and the driver should use consistent JobIDs in the RDD 
commit protocol

I have modified SparkHadoopWriter so that executors and the driver always use 
consistent JobIds during the hadoop commit. Before SPARK-18191, spark always 
used the rddId, it just incorrectly named the variable stageId. After 
SPARK-18191, it used the rddId as the jobId on the driver's side, and the 
stageId as the jobId on the executors' side. With this change executors and the 
driver will consistently uses rddId as the jobId. Also with this change, during 
the hadoop commit protocol spark uses  actual stageId to check whether a stage 
can be committed unlike before that  it was using executors' jobId to do this 
check.
In addition to the existing unit tests, a test has been added to check whether 
executors and the driver are using the same JobId. The test failed before this 
change and passed after applying this fix.

Author: Reza Safi 

Closes #19848 from rezasafi/stagerddsimple.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f81401e1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f81401e1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f81401e1

Branch: refs/heads/master
Commit: f81401e1cb39f2d6049b79dc8d61305f3371276f
Parents: 3927bb9
Author: Reza Safi 
Authored: Mon Dec 4 09:23:48 2017 -0800
Committer: Marcelo Vanzin 
Committed: Mon Dec 4 09:23:48 2017 -0800

--
 .../spark/internal/io/SparkHadoopWriter.scala   | 12 +++---
 .../spark/mapred/SparkHadoopMapRedUtil.scala|  5 ++-
 .../spark/rdd/PairRDDFunctionsSuite.scala   | 44 
 3 files changed, 53 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f81401e1/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala 
b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
index 949d8c6..abf3921 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
@@ -60,17 +60,17 @@ object SparkHadoopWriter extends Logging {
   config: HadoopWriteConfigUtil[K, V]): Unit = {
 // Extract context and configuration from RDD.
 val sparkContext = rdd.context
-val stageId = rdd.id
+val commitJobId = rdd.id
 
 // Set up a job.
 val jobTrackerId = createJobTrackerID(new Date())
-val jobContext = config.createJobContext(jobTrackerId, stageId)
+val jobContext = config.createJobContext(jobTrackerId, commitJobId)
 config.initOutputFormat(jobContext)
 
 // Assert the output format/key/value class is set in JobConf.
 config.assertConf(jobContext, rdd.conf)
 
-val committer = config.createCommitter(stageId)
+val committer = config.createCommitter(commitJobId)
 committer.setupJob(jobContext)
 
 // Try to write all RDD partitions as a Hadoop OutputFormat.
@@ -80,7 +80,7 @@ object SparkHadoopWriter extends Logging {
   context = context,
   config = config,
   jobTrackerId = jobTrackerId,
-  sparkStageId = context.stageId,
+  commitJobId = commitJobId,
   sparkPartitionId = context.partitionId,
   sparkAttemptNumber = context.attemptNumber,
   committer = committer,
@@ -102,14 +102,14 @@ object SparkHadoopWriter extends Logging {
   context: TaskContext,
   config: HadoopWriteConfigUtil[K, V],
   jobTrackerId: String,
-  sparkStageId: Int,
+  commitJobId: Int,
   sparkPartitionId: Int,
   sparkAttemptNumber: Int,
   committer: FileCommitProtocol,
   iterator: Iterator[(K, V)]): TaskCommitMessage = {
 // Set up a task.
 val taskContext = config.createTaskAttemptContext(
-  jobTrackerId, sparkStageId, sparkPartitionId, sparkAttemptNumber)
+  jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber)
 committer.setupTask(taskContext)
 
 val (outputMetrics, callback) = initHadoopOutputMetrics(context)

http://git-wip-us.apache.org/repos/asf/spark/blob/f81401e1/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala 
b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index 607283a..764735d 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ 

spark git commit: [SPARK-22473][FOLLOWUP][TEST] Remove deprecated Date functions

2017-12-04 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 4131ad03f -> 3927bb9b4


[SPARK-22473][FOLLOWUP][TEST] Remove deprecated Date functions

## What changes were proposed in this pull request?

#19696 replaced the deprecated usages for `Date` and `Waiter`, but a few 
methods were missed. The PR fixes the forgotten deprecated usages.

## How was this patch tested?

existing UTs

Author: Marco Gaido 

Closes #19875 from mgaido91/SPARK-22473_FOLLOWUP.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3927bb9b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3927bb9b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3927bb9b

Branch: refs/heads/master
Commit: 3927bb9b460d2d944ecf3c8552d71e8a25d29655
Parents: 4131ad0
Author: Marco Gaido 
Authored: Mon Dec 4 11:07:27 2017 -0600
Committer: Sean Owen 
Committed: Mon Dec 4 11:07:27 2017 -0600

--
 .../spark/sql/catalyst/expressions/PredicateSuite.scala  | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3927bb9b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
index 865092a..0079e4e 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
@@ -291,26 +291,26 @@ class PredicateSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   private val udt = new ExamplePointUDT
 
   private val smallValues =
-Seq(1.toByte, 1.toShort, 1, 1L, Decimal(1), Array(1.toByte), new 
Date(2000, 1, 1),
+Seq(1.toByte, 1.toShort, 1, 1L, Decimal(1), Array(1.toByte), 
Date.valueOf("2000-01-01"),
   new Timestamp(1), "a", 1f, 1d, 0f, 0d, false, Array(1L, 2L))
   .map(Literal(_)) ++ Seq(Literal.create(MyStruct(1L, "b")),
   Literal.create(MyStruct2(MyStruct(1L, "a"), Array(1, 1))),
   Literal.create(ArrayData.toArrayData(Array(1.0, 2.0)), udt))
   private val largeValues =
-Seq(2.toByte, 2.toShort, 2, 2L, Decimal(2), Array(2.toByte), new 
Date(2000, 1, 2),
+Seq(2.toByte, 2.toShort, 2, 2L, Decimal(2), Array(2.toByte), 
Date.valueOf("2000-01-02"),
   new Timestamp(2), "b", 2f, 2d, Float.NaN, Double.NaN, true, Array(2L, 
1L))
   .map(Literal(_)) ++ Seq(Literal.create(MyStruct(2L, "b")),
   Literal.create(MyStruct2(MyStruct(1L, "a"), Array(1, 2))),
   Literal.create(ArrayData.toArrayData(Array(1.0, 3.0)), udt))
 
   private val equalValues1 =
-Seq(1.toByte, 1.toShort, 1, 1L, Decimal(1), Array(1.toByte), new 
Date(2000, 1, 1),
+Seq(1.toByte, 1.toShort, 1, 1L, Decimal(1), Array(1.toByte), 
Date.valueOf("2000-01-01"),
   new Timestamp(1), "a", 1f, 1d, Float.NaN, Double.NaN, true, Array(1L, 
2L))
   .map(Literal(_)) ++ Seq(Literal.create(MyStruct(1L, "b")),
   Literal.create(MyStruct2(MyStruct(1L, "a"), Array(1, 1))),
   Literal.create(ArrayData.toArrayData(Array(1.0, 2.0)), udt))
   private val equalValues2 =
-Seq(1.toByte, 1.toShort, 1, 1L, Decimal(1), Array(1.toByte), new 
Date(2000, 1, 1),
+Seq(1.toByte, 1.toShort, 1, 1L, Decimal(1), Array(1.toByte), 
Date.valueOf("2000-01-01"),
   new Timestamp(1), "a", 1f, 1d, Float.NaN, Double.NaN, true, Array(1L, 
2L))
   .map(Literal(_)) ++ Seq(Literal.create(MyStruct(1L, "b")),
   Literal.create(MyStruct2(MyStruct(1L, "a"), Array(1, 1))),


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org