spark git commit: [SPARK-23466][SQL] Remove redundant null checks in generated Java code by GenerateUnsafeProjection

2018-08-31 Thread ueshin
Repository: spark
Updated Branches:
  refs/heads/master e1d72f2c0 -> c5583fdcd


[SPARK-23466][SQL] Remove redundant null checks in generated Java code by 
GenerateUnsafeProjection

## What changes were proposed in this pull request?

This PR works for one of TODOs in `GenerateUnsafeProjection` "if the 
nullability of field is correct, we can use it to save null check" to simplify 
generated code.
When `nullable=false` in `DataType`, `GenerateUnsafeProjection` removed code 
for null checks in the generated Java code.

## How was this patch tested?

Added new test cases into `GenerateUnsafeProjectionSuite`

Closes #20637 from kiszk/SPARK-23466.

Authored-by: Kazuaki Ishizaki 
Signed-off-by: Takuya UESHIN 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5583fdc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5583fdc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5583fdc

Branch: refs/heads/master
Commit: c5583fdcd2289559ad98371475eb7288ced9b148
Parents: e1d72f2
Author: Kazuaki Ishizaki 
Authored: Sat Sep 1 12:19:19 2018 +0900
Committer: Takuya UESHIN 
Committed: Sat Sep 1 12:19:19 2018 +0900

--
 .../codegen/GenerateUnsafeProjection.scala  | 77 
 .../expressions/JsonExpressionsSuite.scala  |  2 +-
 .../codegen/GenerateUnsafeProjectionSuite.scala | 71 +-
 3 files changed, 117 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c5583fdc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
index 998a675..0ecd0de 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
@@ -32,6 +32,8 @@ import org.apache.spark.sql.types._
  */
 object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], 
UnsafeProjection] {
 
+  case class Schema(dataType: DataType, nullable: Boolean)
+
   /** Returns true iff we support this data type. */
   def canSupport(dataType: DataType): Boolean = 
UserDefinedType.sqlType(dataType) match {
 case NullType => true
@@ -43,19 +45,21 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
 case _ => false
   }
 
-  // TODO: if the nullability of field is correct, we can use it to save null 
check.
   private def writeStructToBuffer(
   ctx: CodegenContext,
   input: String,
   index: String,
-  fieldTypes: Seq[DataType],
+  schemas: Seq[Schema],
   rowWriter: String): String = {
 // Puts `input` in a local variable to avoid to re-evaluate it if it's a 
statement.
 val tmpInput = ctx.freshName("tmpInput")
-val fieldEvals = fieldTypes.zipWithIndex.map { case (dt, i) =>
-  ExprCode(
-JavaCode.isNullExpression(s"$tmpInput.isNullAt($i)"),
-JavaCode.expression(CodeGenerator.getValue(tmpInput, dt, i.toString), 
dt))
+val fieldEvals = schemas.zipWithIndex.map { case (Schema(dt, nullable), i) 
=>
+  val isNull = if (nullable) {
+JavaCode.isNullExpression(s"$tmpInput.isNullAt($i)")
+  } else {
+FalseLiteral
+  }
+  ExprCode(isNull, JavaCode.expression(CodeGenerator.getValue(tmpInput, 
dt, i.toString), dt))
 }
 
 val rowWriterClass = classOf[UnsafeRowWriter].getName
@@ -70,7 +74,7 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
|  // Remember the current cursor so that we can calculate how many 
bytes are
|  // written later.
|  final int $previousCursor = $rowWriter.cursor();
-   |  ${writeExpressionsToBuffer(ctx, tmpInput, fieldEvals, fieldTypes, 
structRowWriter)}
+   |  ${writeExpressionsToBuffer(ctx, tmpInput, fieldEvals, schemas, 
structRowWriter)}
|  $rowWriter.setOffsetAndSizeFromPreviousCursor($index, 
$previousCursor);
|}
  """.stripMargin
@@ -80,7 +84,7 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
   ctx: CodegenContext,
   row: String,
   inputs: Seq[ExprCode],
-  inputTypes: Seq[DataType],
+  schemas: Seq[Schema],
   rowWriter: String,
   isTopLevel: Boolean = false): String = {
 val resetWriter = if (isTopLevel) {
@@ -98,8 +102,8 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
   s"$rowWriter.resetRowWriter();"

svn commit: r29075 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_31_16_02-e1d72f2-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-31 Thread pwendell
Author: pwendell
Date: Fri Aug 31 23:16:05 2018
New Revision: 29075

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_31_16_02-e1d72f2 docs


[This commit notification would consist of 1478 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25264][K8S] Fix comma-delineated arguments passed into PythonRunner and RRunner

2018-08-31 Thread mcheah
Repository: spark
Updated Branches:
  refs/heads/master 32da87dfa -> e1d72f2c0


[SPARK-25264][K8S] Fix comma-delineated arguments passed into PythonRunner and 
RRunner

## What changes were proposed in this pull request?

Fixes the issue brought up in 
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/273 where 
the arguments were being comma-delineated, which was incorrect wrt to the 
PythonRunner and RRunner.

## How was this patch tested?

Modified unit test to test this change.

Author: Ilan Filonenko 

Closes #22257 from ifilonenko/SPARK-25264.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1d72f2c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1d72f2c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1d72f2c

Branch: refs/heads/master
Commit: e1d72f2c07ecd6f1880299e9373daa21cb032017
Parents: 32da87d
Author: Ilan Filonenko 
Authored: Fri Aug 31 15:46:45 2018 -0700
Committer: mcheah 
Committed: Fri Aug 31 15:46:45 2018 -0700

--
 .../deploy/k8s/features/bindings/PythonDriverFeatureStep.scala   | 3 ++-
 .../spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala  | 3 ++-
 .../k8s/features/bindings/PythonDriverFeatureStepSuite.scala | 4 ++--
 .../deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala   | 4 ++--
 4 files changed, 8 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e1d72f2c/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
--
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
index c20bcac..406944a 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
@@ -30,11 +30,12 @@ private[spark] class PythonDriverFeatureStep(
   override def configurePod(pod: SparkPod): SparkPod = {
 val roleConf = kubernetesConf.roleSpecificConf
 require(roleConf.mainAppResource.isDefined, "PySpark Main Resource must be 
defined")
+// Delineation is done by " " because that is input into PythonRunner
 val maybePythonArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map(
   pyArgs =>
 new EnvVarBuilder()
   .withName(ENV_PYSPARK_ARGS)
-  .withValue(pyArgs.mkString(","))
+  .withValue(pyArgs.mkString(" "))
   .build())
 val maybePythonFiles = kubernetesConf.pyFiles().map(
   // Dilineation by ":" is to append the PySpark Files to the PYTHONPATH

http://git-wip-us.apache.org/repos/asf/spark/blob/e1d72f2c/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
--
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
index b33b86e..11b09b3 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
@@ -30,11 +30,12 @@ private[spark] class RDriverFeatureStep(
   override def configurePod(pod: SparkPod): SparkPod = {
 val roleConf = kubernetesConf.roleSpecificConf
 require(roleConf.mainAppResource.isDefined, "R Main Resource must be 
defined")
+// Delineation is done by " " because that is input into RRunner
 val maybeRArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map(
   rArgs =>
 new EnvVarBuilder()
   .withName(ENV_R_ARGS)
-  .withValue(rArgs.mkString(","))
+  .withValue(rArgs.mkString(" "))
   .build())
 val envSeq =
   Seq(new EnvVarBuilder()

http://git-wip-us.apache.org/repos/asf/spark/blob/e1d72f2c/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala
--
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala
 

svn commit: r29067 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_31_12_02-32da87d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-31 Thread pwendell
Author: pwendell
Date: Fri Aug 31 19:16:27 2018
New Revision: 29067

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_31_12_02-32da87d docs


[This commit notification would consist of 1478 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25286][CORE] Removing the dangerous parmap

2018-08-31 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 7fc8881b0 -> 32da87dfa


[SPARK-25286][CORE] Removing the dangerous parmap

## What changes were proposed in this pull request?

I propose to remove one of `parmap` methods which accepts an execution context 
as a parameter. The method should be removed to eliminate any deadlocks that 
can occur if `parmap` is called recursively on thread pools restricted by size.

Closes #22292 from MaxGekk/remove-overloaded-parmap.

Authored-by: Maxim Gekk 
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32da87df
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32da87df
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32da87df

Branch: refs/heads/master
Commit: 32da87dfa451fff677ed9316f740be2abdbff6a4
Parents: 7fc8881
Author: Maxim Gekk 
Authored: Fri Aug 31 10:43:30 2018 -0700
Committer: Xiao Li 
Committed: Fri Aug 31 10:43:30 2018 -0700

--
 .../scala/org/apache/spark/rdd/UnionRDD.scala   | 17 ++-
 .../org/apache/spark/util/ThreadUtils.scala | 32 +++-
 .../streaming/util/FileBasedWriteAheadLog.scala |  5 +--
 3 files changed, 16 insertions(+), 38 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/32da87df/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 4b6f732..60e383a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -20,13 +20,12 @@ package org.apache.spark.rdd
 import java.io.{IOException, ObjectOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.ExecutionContext
+import scala.collection.parallel.ForkJoinTaskSupport
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.reflect.ClassTag
 
 import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, 
TaskContext}
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.ThreadUtils.parmap
 import org.apache.spark.util.Utils
 
 /**
@@ -60,7 +59,8 @@ private[spark] class UnionPartition[T: ClassTag](
 }
 
 object UnionRDD {
-  private[spark] lazy val threadPool = new ForkJoinPool(8)
+  private[spark] lazy val partitionEvalTaskSupport =
+new ForkJoinTaskSupport(new ForkJoinPool(8))
 }
 
 @DeveloperApi
@@ -74,13 +74,14 @@ class UnionRDD[T: ClassTag](
 rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10)
 
   override def getPartitions: Array[Partition] = {
-val partitionLengths = if (isPartitionListingParallel) {
-  implicit val ec = ExecutionContext.fromExecutor(UnionRDD.threadPool)
-  parmap(rdds)(_.partitions.length)
+val parRDDs = if (isPartitionListingParallel) {
+  val parArray = rdds.par
+  parArray.tasksupport = UnionRDD.partitionEvalTaskSupport
+  parArray
 } else {
-  rdds.map(_.partitions.length)
+  rdds
 }
-val array = new Array[Partition](partitionLengths.sum)
+val array = new Array[Partition](parRDDs.map(_.partitions.length).seq.sum)
 var pos = 0
 for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
   array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)

http://git-wip-us.apache.org/repos/asf/spark/blob/32da87df/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index f0e5add..cb0c205 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -284,36 +284,12 @@ private[spark] object ThreadUtils {
 try {
   implicit val ec = ExecutionContext.fromExecutor(pool)
 
-  parmap(in)(f)
+  val futures = in.map(x => Future(f(x)))
+  val futureSeq = Future.sequence(futures)
+
+  awaitResult(futureSeq, Duration.Inf)
 } finally {
   pool.shutdownNow()
 }
   }
-
-  /**
-   * Transforms input collection by applying the given function to each 
element in parallel fashion.
-   * Comparing to the map() method of Scala parallel collections, this method 
can be interrupted
-   * at any time. This is useful on canceling of task execution, for example.
-   *
-   * @param in - the input collection which should be transformed in parallel.
-   * @param f - the lambda function will be applied to each element of `in`.
-   * @param ec - an execution context for parallel applying of the given 
function `f`.
-   * @tparam I - the 

spark git commit: [SPARK-25296][SQL][TEST] Create ExplainSuite

2018-08-31 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 339859c4e -> 7fc8881b0


[SPARK-25296][SQL][TEST] Create ExplainSuite

## What changes were proposed in this pull request?
Move the output verification of Explain test cases to a new suite ExplainSuite.

## How was this patch tested?
N/A

Closes #22300 from gatorsmile/test3200.

Authored-by: Xiao Li 
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7fc8881b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7fc8881b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7fc8881b

Branch: refs/heads/master
Commit: 7fc8881b0fbc3d85a524e0454fa89925e92c4fa4
Parents: 339859c
Author: Xiao Li 
Authored: Fri Aug 31 08:47:20 2018 -0700
Committer: Xiao Li 
Committed: Fri Aug 31 08:47:20 2018 -0700

--
 .../org/apache/spark/sql/DataFrameSuite.scala   |  9 ---
 .../apache/spark/sql/DatasetCacheSuite.scala| 11 
 .../org/apache/spark/sql/DatasetSuite.scala | 10 
 .../org/apache/spark/sql/ExplainSuite.scala | 58 
 4 files changed, 58 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7fc8881b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 6f5c730..d43fcf3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2553,13 +2553,4 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
 }
   }
 
-  test("SPARK-23034 show rdd names in RDD scan nodes") {
-val rddWithName = spark.sparkContext.parallelize(Row(1, "abc") :: 
Nil).setName("testRdd")
-val df2 = spark.createDataFrame(rddWithName, StructType.fromDDL("c0 int, 
c1 string"))
-val output2 = new java.io.ByteArrayOutputStream()
-Console.withOut(output2) {
-  df2.explain(extended = false)
-}
-assert(output2.toString.contains("Scan ExistingRDD testRdd"))
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7fc8881b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
index 44177e3..5c6a021 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
@@ -206,15 +206,4 @@ class DatasetCacheSuite extends QueryTest with 
SharedSQLContext with TimeLimits
 // first time use, load cache
 checkDataset(df5, Row(10))
   }
-
-  test("SPARK-24850 InMemoryRelation string representation does not include 
cached plan") {
-val df = Seq(1).toDF("a").cache()
-val outputStream = new java.io.ByteArrayOutputStream()
-Console.withOut(outputStream) {
-  df.explain(false)
-}
-assert(outputStream.toString.replaceAll("#\\d+", "#x").contains(
-  "InMemoryRelation [a#x], StorageLevel(disk, memory, deserialized, 1 
replicas)"
-))
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7fc8881b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 6069f28..cf24eba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1498,16 +1498,6 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
   df.where($"city".contains(new java.lang.Character('A'))),
   Seq(Row("Amsterdam")))
   }
-
-  test("SPARK-23034 show rdd names in RDD scan nodes") {
-val rddWithName = spark.sparkContext.parallelize(SingleData(1) :: 
Nil).setName("testRdd")
-val df = spark.createDataFrame(rddWithName)
-val output = new java.io.ByteArrayOutputStream()
-Console.withOut(output) {
-  df.explain(extended = false)
-}
-assert(output.toString.contains("Scan testRdd"))
-  }
 }
 
 case class TestDataUnion(x: Int, y: Int, z: Int)

http://git-wip-us.apache.org/repos/asf/spark/blob/7fc8881b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala 

svn commit: r29060 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_31_08_02-339859c-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-31 Thread pwendell
Author: pwendell
Date: Fri Aug 31 15:17:20 2018
New Revision: 29060

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_31_08_02-339859c docs


[This commit notification would consist of 1478 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25261][MINOR][DOC] update the description for spark.executor|driver.memory in configuration.md

2018-08-31 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 8d9495a8f -> 339859c4e


[SPARK-25261][MINOR][DOC] update the description for 
spark.executor|driver.memory in configuration.md

## What changes were proposed in this pull request?

As described in 
[SPARK-25261](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-25261),the
 unit of spark.executor.memory and spark.driver.memory is parsed as bytes in 
some cases if no unit specified, while in 
https://spark.apache.org/docs/latest/configuration.html#application-properties, 
they are descibed as MiB, which may lead to some misunderstandings.

## How was this patch tested?

N/A

Closes #22252 from ivoson/branch-correct-configuration.

Lead-authored-by: huangtengfei02 
Co-authored-by: Huang Tengfei 
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/339859c4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/339859c4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/339859c4

Branch: refs/heads/master
Commit: 339859c4e4b27726ba8ce9a64451a981ef74de0c
Parents: 8d9495a
Author: huangtengfei02 
Authored: Fri Aug 31 09:06:38 2018 -0500
Committer: Sean Owen 
Committed: Fri Aug 31 09:06:38 2018 -0500

--
 docs/configuration.md | 9 +
 1 file changed, 5 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/339859c4/docs/configuration.md
--
diff --git a/docs/configuration.md b/docs/configuration.md
index b5ff426..f344bcd 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -152,8 +152,9 @@ of the most common options to set are:
   spark.driver.memory
   1g
   
-Amount of memory to use for the driver process, i.e. where SparkContext is 
initialized, in MiB 
-unless otherwise specified (e.g. 1g, 2g).
+Amount of memory to use for the driver process, i.e. where SparkContext is 
initialized, in the
+same format as JVM memory strings with a size unit suffix ("k", "m", "g" 
or "t")
+(e.g. 512m, 2g).
 
 Note: In client mode, this config must not be set through the 
SparkConf
 directly in your application, because the driver JVM has already started 
at that point.
@@ -175,8 +176,8 @@ of the most common options to set are:
   spark.executor.memory
   1g
   
-Amount of memory to use per executor process, in MiB unless otherwise 
specified.
-(e.g. 2g, 8g).
+Amount of memory to use per executor process, in the same format as JVM 
memory strings with
+a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 
2g).
   
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25207][SQL] Case-insensitve field resolution for filter pushdown when reading Parquet

2018-08-31 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 515708d5f -> 8d9495a8f


[SPARK-25207][SQL] Case-insensitve field resolution for filter pushdown when 
reading Parquet

## What changes were proposed in this pull request?

Currently, filter pushdown will not work if Parquet schema and Hive metastore 
schema are in different letter cases even spark.sql.caseSensitive is false.

Like the below case:
```scala
spark.sparkContext.hadoopConfiguration.setInt("parquet.block.size", 8 * 1024 * 
1024)
spark.range(1, 40 * 1024 * 1024, 1, 
1).sortWithinPartitions("id").write.parquet("/tmp/t")
sql("CREATE TABLE t (ID LONG) USING parquet LOCATION '/tmp/t'")
sql("select * from t where id < 100L").write.csv("/tmp/id")
```

Although filter "ID < 100L" is generated by Spark, it fails to pushdown into 
parquet actually, Spark still does the full table scan when reading.
This PR provides a case-insensitive field resolution to make it work.

Before - "ID < 100L" fail to pushedown:
https://user-images.githubusercontent.com/2989575/44530558-40ef8b00-a721-11e8-8abc-7f97671590d3.png;>
After - "ID < 100L" pushedown sucessfully:
https://user-images.githubusercontent.com/2989575/44530567-44831200-a721-11e8-8634-e9f664b33d39.png;>

## How was this patch tested?

Added UTs.

Closes #22197 from yucai/SPARK-25207.

Authored-by: yucai 
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d9495a8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d9495a8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d9495a8

Branch: refs/heads/master
Commit: 8d9495a8f1e64dbc42c3741f9bcbd4893ce3f0e9
Parents: 515708d
Author: yucai 
Authored: Fri Aug 31 19:24:09 2018 +0800
Committer: Wenchen Fan 
Committed: Fri Aug 31 19:24:09 2018 +0800

--
 .../datasources/parquet/ParquetFileFormat.scala |   3 +-
 .../datasources/parquet/ParquetFilters.scala|  90 ++-
 .../parquet/ParquetFilterSuite.scala| 115 ++-
 3 files changed, 179 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8d9495a8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index d7eb143..ea4f159 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -347,6 +347,7 @@ class ParquetFileFormat
 val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
 val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
 val pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold
+val isCaseSensitive = sqlConf.caseSensitiveAnalysis
 
 (file: PartitionedFile) => {
   assert(file.partitionValues.numFields == partitionSchema.size)
@@ -372,7 +373,7 @@ class ParquetFileFormat
   val pushed = if (enableParquetFilterPushDown) {
 val parquetSchema = footerFileMetaData.getSchema
 val parquetFilters = new ParquetFilters(pushDownDate, 
pushDownTimestamp, pushDownDecimal,
-  pushDownStringStartWith, pushDownInFilterThreshold)
+  pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
 filters
   // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
   // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`

http://git-wip-us.apache.org/repos/asf/spark/blob/8d9495a8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 58b4a76..0c286de 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, 
Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
 import java.sql.{Date, Timestamp}
+import java.util.Locale
 
 import scala.collection.JavaConverters.asScalaBufferConverter
 
@@ 

spark git commit: [SPARK-25183][SQL] Spark HiveServer2 to use Spark ShutdownHookManager

2018-08-31 Thread jshao
Repository: spark
Updated Branches:
  refs/heads/master aa70a0a1a -> 515708d5f


[SPARK-25183][SQL] Spark HiveServer2 to use Spark ShutdownHookManager

## What changes were proposed in this pull request?

Switch `org.apache.hive.service.server.HiveServer2` to register its shutdown 
callback with Spark's `ShutdownHookManager`, rather than direct with the Java 
Runtime callback.

This avoids race conditions in shutdown where the filesystem is shutdown before 
the flush/write/rename of the event log is completed, particularly on object 
stores where the write and rename can be slow.

## How was this patch tested?

There's no explicit unit for test this, which is consistent with every other 
shutdown hook in the codebase.

* There's an implicit test when the scalatest process is halted.
* More manual/integration testing is needed.

HADOOP-15679 has added the ability to explicitly execute the hadoop shutdown 
hook sequence which spark uses; that could be stabilized for testing if 
desired, after which all the spark hooks could be tested. Until then: external 
system tests only.

Author: Steve Loughran 

Closes #22186 from steveloughran/BUG/SPARK-25183-shutdown.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/515708d5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/515708d5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/515708d5

Branch: refs/heads/master
Commit: 515708d5f33d5acdb4206c626192d1838f8e691f
Parents: aa70a0a
Author: Steve Loughran 
Authored: Fri Aug 31 14:45:29 2018 +0800
Committer: jerryshao 
Committed: Fri Aug 31 14:45:29 2018 +0800

--
 .../apache/hive/service/server/HiveServer2.java | 30 ++--
 1 file changed, 22 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/515708d5/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java
--
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java
index 9bf96cf..a30be2b 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java
@@ -20,6 +20,9 @@ package org.apache.hive.service.server;
 
 import java.util.Properties;
 
+import scala.runtime.AbstractFunction0;
+import scala.runtime.BoxedUnit;
+
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Option;
@@ -39,6 +42,8 @@ import 
org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
 import org.apache.hive.service.cli.thrift.ThriftCLIService;
 import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
 
+import org.apache.spark.util.ShutdownHookManager;
+
 /**
  * HiveServer2.
  *
@@ -67,13 +72,23 @@ public class HiveServer2 extends CompositeService {
 super.init(hiveConf);
 
 // Add a shutdown hook for catching SIGTERM & SIGINT
-final HiveServer2 hiveServer2 = this;
-Runtime.getRuntime().addShutdownHook(new Thread() {
-  @Override
-  public void run() {
-hiveServer2.stop();
-  }
-});
+// this must be higher than the Hadoop Filesystem priority of 10,
+// which the default priority is.
+// The signature of the callback must match that of a scala () -> Unit
+// function
+ShutdownHookManager.addShutdownHook(
+new AbstractFunction0() {
+  public BoxedUnit apply() {
+try {
+  LOG.info("Hive Server Shutdown hook invoked");
+  stop();
+} catch (Throwable e) {
+  LOG.warn("Ignoring Exception while stopping Hive Server from 
shutdown hook",
+  e);
+}
+return BoxedUnit.UNIT;
+  }
+});
   }
 
   public static boolean isHTTPTransportMode(HiveConf hiveConf) {
@@ -95,7 +110,6 @@ public class HiveServer2 extends CompositeService {
   @Override
   public synchronized void stop() {
 LOG.info("Shutting down HiveServer2");
-HiveConf hiveConf = this.getHiveConf();
 super.stop();
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25288][TESTS] Fix flaky Kafka transaction tests

2018-08-31 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master f29c2b528 -> aa70a0a1a


[SPARK-25288][TESTS] Fix flaky Kafka transaction tests

## What changes were proposed in this pull request?

Here are the failures:

http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaRelationSuite_name=read+Kafka+transactional+messages%3A+read_committed
http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceSuite_name=read+Kafka+transactional+messages%3A+read_committed
http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite_name=read+Kafka+transactional+messages%3A+read_committed

I found the Kafka consumer may not see the committed messages for a short time. 
This PR just adds a new method `waitUntilOffsetAppears` and uses it to make 
sure the consumer can see a specified offset before checking the result.

## How was this patch tested?

Jenkins

Closes #22293 from zsxwing/SPARK-25288.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa70a0a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa70a0a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa70a0a1

Branch: refs/heads/master
Commit: aa70a0a1a434e8a4b1d4dde00e20b865bb70b8dd
Parents: f29c2b5
Author: Shixiong Zhu 
Authored: Thu Aug 30 23:23:11 2018 -0700
Committer: Shixiong Zhu 
Committed: Thu Aug 30 23:23:11 2018 -0700

--
 .../kafka010/KafkaMicroBatchSourceSuite.scala   | 34 
 .../spark/sql/kafka010/KafkaRelationSuite.scala |  7 
 .../spark/sql/kafka010/KafkaTestUtils.scala | 10 ++
 3 files changed, 37 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/aa70a0a1/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index eb66cca..78249f7 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -160,14 +160,18 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext with Kaf
   }
 
   object WithOffsetSync {
-def apply(topic: String)(func: () => Unit): StreamAction = {
+/**
+ * Run `func` to write some Kafka messages and wait until the latest 
offset of the given
+ * `TopicPartition` is not less than `expectedOffset`.
+ */
+def apply(
+topicPartition: TopicPartition,
+expectedOffset: Long)(func: () => Unit): StreamAction = {
   Execute("Run Kafka Producer")(_ => {
 func()
 // This is a hack for the race condition that the committed message 
may be not visible to
 // consumer for a short time.
-// Looks like after the following call returns, the consumer can 
always read the committed
-// messages.
-testUtils.getLatestOffsets(Set(topic))
+testUtils.waitUntilOffsetAppears(topicPartition, expectedOffset)
   })
 }
   }
@@ -652,13 +656,14 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   }
 }
 
+val topicPartition = new TopicPartition(topic, 0)
 // The message values are the same as their offsets to make the test easy 
to follow
 testUtils.withTranscationalProducer { producer =>
   testStream(mapped)(
 StartStream(ProcessingTime(100), clock),
 waitUntilBatchProcessed,
 CheckAnswer(),
-WithOffsetSync(topic) { () =>
+WithOffsetSync(topicPartition, expectedOffset = 5) { () =>
   // Send 5 messages. They should be visible only after being 
committed.
   producer.beginTransaction()
   (0 to 4).foreach { i =>
@@ -669,7 +674,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 waitUntilBatchProcessed,
 // Should not see any uncommitted messages
 CheckNewAnswer(),
-WithOffsetSync(topic) { () =>
+WithOffsetSync(topicPartition, expectedOffset = 6) { () =>
   producer.commitTransaction()
 },
 AdvanceManualClock(100),
@@ -678,7 +683,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 AdvanceManualClock(100),
 waitUntilBatchProcessed,
 CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a