spark git commit: [SPARK-23778][CORE] Avoid unneeded shuffle when union gets an empty RDD

2018-06-19 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master bc0498d58 -> bc111463a


[SPARK-23778][CORE] Avoid unneeded shuffle when union gets an empty RDD

## What changes were proposed in this pull request?

When a `union` is invoked on several RDDs of which one is an empty RDD, the 
result of the operation is a `UnionRDD`. This causes an unneeded extra-shuffle 
when all the other RDDs have the same partitioning.

The PR ignores incoming empty RDDs in the union method.

## How was this patch tested?

added UT

Author: Marco Gaido 

Closes #21333 from mgaido91/SPARK-23778.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc111463
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc111463
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc111463

Branch: refs/heads/master
Commit: bc111463a766a5619966a282fbe0fec991088ceb
Parents: bc0498d
Author: Marco Gaido 
Authored: Tue Jun 19 22:29:00 2018 -0700
Committer: Wenchen Fan 
Committed: Tue Jun 19 22:29:00 2018 -0700

--
 .../main/scala/org/apache/spark/SparkContext.scala|  9 +
 .../test/scala/org/apache/spark/rdd/RDDSuite.scala| 14 +-
 2 files changed, 18 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bc111463/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 5e85956..74bfb5d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1306,11 +1306,12 @@ class SparkContext(config: SparkConf) extends Logging {
 
   /** Build the union of a list of RDDs. */
   def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope {
-val partitioners = rdds.flatMap(_.partitioner).toSet
-if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
-  new PartitionerAwareUnionRDD(this, rdds)
+val nonEmptyRdds = rdds.filter(!_.partitions.isEmpty)
+val partitioners = nonEmptyRdds.flatMap(_.partitioner).toSet
+if (nonEmptyRdds.forall(_.partitioner.isDefined) && partitioners.size == 
1) {
+  new PartitionerAwareUnionRDD(this, nonEmptyRdds)
 } else {
-  new UnionRDD(this, rdds)
+  new UnionRDD(this, nonEmptyRdds)
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bc111463/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 191c612..5148ce0 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -154,6 +154,16 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
 }
   }
 
+  test("SPARK-23778: empty RDD in union should not produce a UnionRDD") {
+val rddWithPartitioner = sc.parallelize(Seq(1 -> true)).partitionBy(new 
HashPartitioner(1))
+val emptyRDD = sc.emptyRDD[(Int, Boolean)]
+val unionRDD = sc.union(emptyRDD, rddWithPartitioner)
+assert(unionRDD.isInstanceOf[PartitionerAwareUnionRDD[_]])
+val unionAllEmptyRDD = sc.union(emptyRDD, emptyRDD)
+assert(unionAllEmptyRDD.isInstanceOf[UnionRDD[_]])
+assert(unionAllEmptyRDD.collect().isEmpty)
+  }
+
   test("partitioner aware union") {
 def makeRDDWithPartitioner(seq: Seq[Int]): RDD[Int] = {
   sc.makeRDD(seq, 1)
@@ -1047,7 +1057,9 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
   private class CyclicalDependencyRDD[T: ClassTag] extends RDD[T](sc, Nil) {
 private val mutableDependencies: ArrayBuffer[Dependency[_]] = 
ArrayBuffer.empty
 override def compute(p: Partition, c: TaskContext): Iterator[T] = 
Iterator.empty
-override def getPartitions: Array[Partition] = Array.empty
+override def getPartitions: Array[Partition] = Array(new Partition {
+  override def index: Int = 0
+})
 override def getDependencies: Seq[Dependency[_]] = mutableDependencies
 def addDependency(dep: Dependency[_]) {
   mutableDependencies += dep


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r27571 - in /dev/spark/2.3.2-SNAPSHOT-2018_06_19_18_01-d687d97-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-06-19 Thread pwendell
Author: pwendell
Date: Wed Jun 20 01:16:54 2018
New Revision: 27571

Log:
Apache Spark 2.3.2-SNAPSHOT-2018_06_19_18_01-d687d97 docs


[This commit notification would consist of 1443 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r27569 - in /dev/spark/2.4.0-SNAPSHOT-2018_06_19_16_01-bc0498d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-06-19 Thread pwendell
Author: pwendell
Date: Tue Jun 19 23:15:41 2018
New Revision: 27569

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_06_19_16_01-bc0498d docs


[This commit notification would consist of 1468 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24583][SQL] Wrong schema type in InsertIntoDataSourceCommand

2018-06-19 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 2cb976355 -> bc0498d58


[SPARK-24583][SQL] Wrong schema type in InsertIntoDataSourceCommand

## What changes were proposed in this pull request?

Change insert input schema type: "insertRelationType" -> 
"insertRelationType.asNullable", in order to avoid nullable being overridden.

## How was this patch tested?

Added one test in InsertSuite.

Author: Maryann Xue 

Closes #21585 from maryannxue/spark-24583.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc0498d5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc0498d5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc0498d5

Branch: refs/heads/master
Commit: bc0498d5820ded2b428277e396502e74ef0ce36d
Parents: 2cb9763
Author: Maryann Xue 
Authored: Tue Jun 19 15:27:20 2018 -0700
Committer: Xiao Li 
Committed: Tue Jun 19 15:27:20 2018 -0700

--
 .../InsertIntoDataSourceCommand.scala   |  5 +-
 .../apache/spark/sql/sources/InsertSuite.scala  | 51 +++-
 2 files changed, 52 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bc0498d5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
index a813829..80d7608 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
@@ -38,9 +38,8 @@ case class InsertIntoDataSourceCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
 val data = Dataset.ofRows(sparkSession, query)
-// Apply the schema of the existing table to the new data.
-val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, 
logicalRelation.schema)
-relation.insert(df, overwrite)
+// Data has been casted to the target relation's schema by the 
PreprocessTableInsertion rule.
+relation.insert(data, overwrite)
 
 // Re-cache all cached plans(including this relation itself, if it's 
cached) that refer to this
 // data source relation.

http://git-wip-us.apache.org/repos/asf/spark/blob/bc0498d5/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index fef01c8..438d5d8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -20,12 +20,36 @@ package org.apache.spark.sql.sources
 import java.io.File
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+class SimpleInsertSource extends SchemaRelationProvider {
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType): BaseRelation = {
+SimpleInsert(schema)(sqlContext.sparkSession)
+  }
+}
+
+case class SimpleInsert(userSpecifiedSchema: StructType)(@transient val 
sparkSession: SparkSession)
+  extends BaseRelation with InsertableRelation {
+
+  override def sqlContext: SQLContext = sparkSession.sqlContext
+
+  override def schema: StructType = userSpecifiedSchema
+
+  override def insert(input: DataFrame, overwrite: Boolean): Unit = {
+input.collect
+  }
+}
+
 class InsertSuite extends DataSourceTest with SharedSQLContext {
   import testImplicits._
 
@@ -520,4 +544,29 @@ class InsertSuite extends DataSourceTest with 
SharedSQLContext {
   }
 }
   }
+
+  test("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") {
+withTable("test_table") {
+  val schema = new StructType()
+.add("i", LongType, false)
+.add("s", StringType, false)
+  val newTable = CatalogTable(
+identifier = TableIdentifier("test_table", 

spark git commit: [SPARK-24583][SQL] Wrong schema type in InsertIntoDataSourceCommand

2018-06-19 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 50cdb4138 -> d687d97b1


[SPARK-24583][SQL] Wrong schema type in InsertIntoDataSourceCommand

## What changes were proposed in this pull request?

Change insert input schema type: "insertRelationType" -> 
"insertRelationType.asNullable", in order to avoid nullable being overridden.

## How was this patch tested?

Added one test in InsertSuite.

Author: Maryann Xue 

Closes #21585 from maryannxue/spark-24583.

(cherry picked from commit bc0498d5820ded2b428277e396502e74ef0ce36d)
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d687d97b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d687d97b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d687d97b

Branch: refs/heads/branch-2.3
Commit: d687d97b116beafa7f4375f1876049f5da4f5ba7
Parents: 50cdb41
Author: Maryann Xue 
Authored: Tue Jun 19 15:27:20 2018 -0700
Committer: Xiao Li 
Committed: Tue Jun 19 15:27:30 2018 -0700

--
 .../InsertIntoDataSourceCommand.scala   |  5 +-
 .../apache/spark/sql/sources/InsertSuite.scala  | 51 +++-
 2 files changed, 52 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d687d97b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
index a813829..80d7608 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
@@ -38,9 +38,8 @@ case class InsertIntoDataSourceCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
 val data = Dataset.ofRows(sparkSession, query)
-// Apply the schema of the existing table to the new data.
-val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, 
logicalRelation.schema)
-relation.insert(df, overwrite)
+// Data has been casted to the target relation's schema by the 
PreprocessTableInsertion rule.
+relation.insert(data, overwrite)
 
 // Re-cache all cached plans(including this relation itself, if it's 
cached) that refer to this
 // data source relation.

http://git-wip-us.apache.org/repos/asf/spark/blob/d687d97b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index fef01c8..438d5d8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -20,12 +20,36 @@ package org.apache.spark.sql.sources
 import java.io.File
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+class SimpleInsertSource extends SchemaRelationProvider {
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType): BaseRelation = {
+SimpleInsert(schema)(sqlContext.sparkSession)
+  }
+}
+
+case class SimpleInsert(userSpecifiedSchema: StructType)(@transient val 
sparkSession: SparkSession)
+  extends BaseRelation with InsertableRelation {
+
+  override def sqlContext: SQLContext = sparkSession.sqlContext
+
+  override def schema: StructType = userSpecifiedSchema
+
+  override def insert(input: DataFrame, overwrite: Boolean): Unit = {
+input.collect
+  }
+}
+
 class InsertSuite extends DataSourceTest with SharedSQLContext {
   import testImplicits._
 
@@ -520,4 +544,29 @@ class InsertSuite extends DataSourceTest with 
SharedSQLContext {
   }
 }
   }
+
+  test("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") {
+withTable("test_table") {
+  val schema = new StructType()
+.add("i", LongType, false)
+.add("s", 

spark git commit: [SPARK-24565][SS] Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame

2018-06-19 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 13092d733 -> 2cb976355


[SPARK-24565][SS] Add API for in Structured Streaming for exposing output rows 
of each microbatch as a DataFrame

## What changes were proposed in this pull request?

Currently, the micro-batches in the MicroBatchExecution is not exposed to the 
user through any public API. This was because we did not want to expose the 
micro-batches, so that all the APIs we expose, we can eventually support them 
in the Continuous engine. But now that we have better sense of buiding a 
ContinuousExecution, I am considering adding APIs which will run only the 
MicroBatchExecution. I have quite a few use cases where exposing the microbatch 
output as a dataframe is useful.
- Pass the output rows of each batch to a library that is designed only the 
batch jobs (example, uses many ML libraries need to collect() while learning).
- Reuse batch data sources for output whose streaming version does not exists 
(e.g. redshift data source).
- Writer the output rows to multiple places by writing twice for each batch. 
This is not the most elegant thing to do for multiple-output streaming queries 
but is likely to be better than running two streaming queries processing the 
same data twice.

The proposal is to add a method `foreachBatch(f: Dataset[T] => Unit)` to 
Scala/Java/Python `DataStreamWriter`.

## How was this patch tested?
New unit tests.

Author: Tathagata Das 

Closes #21571 from tdas/foreachBatch.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2cb97635
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2cb97635
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2cb97635

Branch: refs/heads/master
Commit: 2cb976355c615eee4ebd0a86f3911fa9284fccf6
Parents: 13092d7
Author: Tathagata Das 
Authored: Tue Jun 19 13:56:51 2018 -0700
Committer: Shixiong Zhu 
Committed: Tue Jun 19 13:56:51 2018 -0700

--
 python/pyspark/java_gateway.py  |  25 +++-
 python/pyspark/sql/streaming.py |  33 -
 python/pyspark/sql/tests.py |  36 +
 python/pyspark/sql/utils.py |  23 +++
 python/pyspark/streaming/context.py |  18 +--
 .../streaming/sources/ForeachBatchSink.scala|  58 
 .../spark/sql/streaming/DataStreamWriter.scala  |  63 +++-
 .../sources/ForeachBatchSinkSuite.scala | 148 +++
 8 files changed, 383 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2cb97635/python/pyspark/java_gateway.py
--
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 0afbe9d..fa2d5e8 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -31,7 +31,7 @@ from subprocess import Popen, PIPE
 if sys.version >= '3':
 xrange = range
 
-from py4j.java_gateway import java_import, JavaGateway, GatewayParameters
+from py4j.java_gateway import java_import, JavaGateway, JavaObject, 
GatewayParameters
 from pyspark.find_spark_home import _find_spark_home
 from pyspark.serializers import read_int, write_with_length, UTF8Deserializer
 
@@ -145,3 +145,26 @@ def do_server_auth(conn, auth_secret):
 if reply != "ok":
 conn.close()
 raise Exception("Unexpected reply from iterator server.")
+
+
+def ensure_callback_server_started(gw):
+"""
+Start callback server if not already started. The callback server is 
needed if the Java
+driver process needs to callback into the Python driver process to execute 
Python code.
+"""
+
+# getattr will fallback to JVM, so we cannot test by hasattr()
+if "_callback_server" not in gw.__dict__ or gw._callback_server is None:
+gw.callback_server_parameters.eager_load = True
+gw.callback_server_parameters.daemonize = True
+gw.callback_server_parameters.daemonize_connections = True
+gw.callback_server_parameters.port = 0
+gw.start_callback_server(gw.callback_server_parameters)
+cbport = gw._callback_server.server_socket.getsockname()[1]
+gw._callback_server.port = cbport
+# gateway with real port
+gw._python_proxy_port = gw._callback_server.port
+# get the GatewayServer object in JVM by ID
+jgws = JavaObject("GATEWAY_SERVER", gw._gateway_client)
+# update the port of CallbackClient with real port
+jgws.resetCallbackClient(jgws.getCallbackClient().getAddress(), 
gw._python_proxy_port)

http://git-wip-us.apache.org/repos/asf/spark/blob/2cb97635/python/pyspark/sql/streaming.py
--
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py

spark git commit: [SPARK-24534][K8S] Bypass non spark-on-k8s commands

2018-06-19 Thread eje
Repository: spark
Updated Branches:
  refs/heads/master 9dbe53eb6 -> 13092d733


[SPARK-24534][K8S] Bypass non spark-on-k8s commands

## What changes were proposed in this pull request?
This PR changes the entrypoint.sh to provide an option to run non spark-on-k8s 
commands (init, driver, executor) in order to let the user keep with the normal 
workflow without hacking the image to bypass the entrypoint

## How was this patch tested?
This patch was built manually in my local machine and I ran some tests with a 
combination of ```docker run``` commands.

Author: rimolive 

Closes #21572 from rimolive/rimolive-spark-24534.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13092d73
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13092d73
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13092d73

Branch: refs/heads/master
Commit: 13092d733791b19cd7994084178306e0c449f2ed
Parents: 9dbe53e
Author: rimolive 
Authored: Tue Jun 19 13:25:00 2018 -0700
Committer: Erik Erlandson 
Committed: Tue Jun 19 13:25:00 2018 -0700

--
 .../src/main/dockerfiles/spark/entrypoint.sh   | 17 +++--
 1 file changed, 11 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/13092d73/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
--
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index acdb4b1..2f4e115 100755
--- 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -37,11 +37,17 @@ if [ -z "$uidentry" ] ; then
 fi
 
 SPARK_K8S_CMD="$1"
-if [ -z "$SPARK_K8S_CMD" ]; then
-  echo "No command to execute has been provided." 1>&2
-  exit 1
-fi
-shift 1
+case "$SPARK_K8S_CMD" in
+driver | driver-py | executor)
+  shift 1
+  ;;
+"")
+  ;;
+*)
+  echo "Non-spark-on-k8s command provided, proceeding in pass-through 
mode..."
+  exec /sbin/tini -s -- "$@"
+  ;;
+esac
 
 SPARK_CLASSPATH="$SPARK_CLASSPATH:${SPARK_HOME}/jars/*"
 env | grep SPARK_JAVA_OPT_ | sort -t_ -k4 -n | sed 's/[^=]*=\(.*\)/\1/g' > 
/tmp/java_opts.txt
@@ -92,7 +98,6 @@ case "$SPARK_K8S_CMD" in
   "$@" $PYSPARK_PRIMARY $PYSPARK_ARGS
 )
 ;;
-
   executor)
 CMD=(
   ${JAVA_HOME}/bin/java


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r27566 - in /dev/spark/2.4.0-SNAPSHOT-2018_06_19_12_02-9dbe53e-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-06-19 Thread pwendell
Author: pwendell
Date: Tue Jun 19 19:17:17 2018
New Revision: 27566

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_06_19_12_02-9dbe53e docs


[This commit notification would consist of 1468 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24556][SQL] Always rewrite output partitioning in ReusedExchangeExec and InMemoryTableScanExec

2018-06-19 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master a78a90464 -> 9dbe53eb6


[SPARK-24556][SQL] Always rewrite output partitioning in ReusedExchangeExec and 
InMemoryTableScanExec

## What changes were proposed in this pull request?

Currently, ReusedExchange and InMemoryTableScanExec only rewrite output 
partitioning if child's partitioning is HashPartitioning and do nothing for 
other partitioning, e.g., RangePartitioning. We should always rewrite it, 
otherwise, unnecessary shuffle could be introduced like 
https://issues.apache.org/jira/browse/SPARK-24556.

## How was this patch tested?

Add new tests.

Author: yucai 

Closes #21564 from yucai/SPARK-24556.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9dbe53eb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9dbe53eb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9dbe53eb

Branch: refs/heads/master
Commit: 9dbe53eb6bb5916d28000f2c0d646cf23094ac11
Parents: a78a904
Author: yucai 
Authored: Tue Jun 19 10:52:51 2018 -0700
Committer: Wenchen Fan 
Committed: Tue Jun 19 10:52:51 2018 -0700

--
 .../columnar/InMemoryTableScanExec.scala|  6 +-
 .../spark/sql/execution/exchange/Exchange.scala |  4 +-
 .../spark/sql/execution/PlannerSuite.scala  | 64 +++-
 3 files changed, 67 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9dbe53eb/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 0b4dd76..997cf92 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, 
SparkPlan, WholeStageCodegenExec}
 import org.apache.spark.sql.execution.vectorized._
 import org.apache.spark.sql.types._
@@ -169,8 +169,8 @@ case class InMemoryTableScanExec(
   // But the cached version could alias output, so we need to replace output.
   override def outputPartitioning: Partitioning = {
 relation.cachedPlan.outputPartitioning match {
-  case h: HashPartitioning => 
updateAttribute(h).asInstanceOf[HashPartitioning]
-  case _ => relation.cachedPlan.outputPartitioning
+  case e: Expression => updateAttribute(e).asInstanceOf[Partitioning]
+  case other => other
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9dbe53eb/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
index 09f79a2..1a5b759 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
@@ -24,7 +24,7 @@ import org.apache.spark.broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
Expression, SortOrder}
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.internal.SQLConf
@@ -70,7 +70,7 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
   }
 
   override def outputPartitioning: Partitioning = child.outputPartitioning 
match {
-case h: HashPartitioning => h.copy(expressions = 
h.expressions.map(updateAttr))
+case e: Expression => updateAttr(e).asInstanceOf[Partitioning]
 case other => other
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9dbe53eb/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
--
diff 

spark git commit: [SPARK-24521][SQL][TEST] Fix ineffective test in CachedTableSuite

2018-06-19 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 9a75c1829 -> a78a90464


[SPARK-24521][SQL][TEST] Fix ineffective test in CachedTableSuite

## What changes were proposed in this pull request?

test("withColumn doesn't invalidate cached dataframe") in CachedTableSuite 
doesn't not work because:

The UDF is executed and test count incremented when "df.cache()" is called and 
the subsequent "df.collect()" has no effect on the test result.

This PR fixed this test and add another test for caching UDF.

## How was this patch tested?

Add new tests.

Author: Li Jin 

Closes #21531 from icexelloss/fix-cache-test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a78a9046
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a78a9046
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a78a9046

Branch: refs/heads/master
Commit: a78a9046413255756653f70165520efd486fb493
Parents: 9a75c18
Author: Li Jin 
Authored: Tue Jun 19 10:42:08 2018 -0700
Committer: Xiao Li 
Committed: Tue Jun 19 10:42:08 2018 -0700

--
 .../org/apache/spark/sql/CachedTableSuite.scala | 19 --
 .../apache/spark/sql/DatasetCacheSuite.scala| 38 +++-
 2 files changed, 37 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a78a9046/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 81b7e18..6982c22 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -83,25 +83,6 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with SharedSQLContext
 }.sum
   }
 
-  test("withColumn doesn't invalidate cached dataframe") {
-var evalCount = 0
-val myUDF = udf((x: String) => { evalCount += 1; "result" })
-val df = Seq(("test", 1)).toDF("s", "i").select(myUDF($"s"))
-df.cache()
-
-df.collect()
-assert(evalCount === 1)
-
-df.collect()
-assert(evalCount === 1)
-
-val df2 = df.withColumn("newColumn", lit(1))
-df2.collect()
-
-// We should not reevaluate the cached dataframe
-assert(evalCount === 1)
-  }
-
   test("cache temp table") {
 withTempView("tempTable") {
   testData.select('key).createOrReplaceTempView("tempTable")

http://git-wip-us.apache.org/repos/asf/spark/blob/a78a9046/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
index e0561ee..82a93f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.sql
 
+import org.scalatest.concurrent.TimeLimits
+import org.scalatest.time.SpanSugar._
+
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.storage.StorageLevel
 
 
-class DatasetCacheSuite extends QueryTest with SharedSQLContext {
+class DatasetCacheSuite extends QueryTest with SharedSQLContext with 
TimeLimits {
   import testImplicits._
 
   test("get storage level") {
@@ -96,4 +99,37 @@ class DatasetCacheSuite extends QueryTest with 
SharedSQLContext {
 agged.unpersist()
 assert(agged.storageLevel == StorageLevel.NONE, "The Dataset agged should 
not be cached.")
   }
+
+  test("persist and then withColumn") {
+val df = Seq(("test", 1)).toDF("s", "i")
+val df2 = df.withColumn("newColumn", lit(1))
+
+df.cache()
+assertCached(df)
+assertCached(df2)
+
+df.count()
+assertCached(df2)
+
+df.unpersist()
+assert(df.storageLevel == StorageLevel.NONE)
+  }
+
+  test("cache UDF result correctly") {
+val expensiveUDF = udf({x: Int => Thread.sleep(1); x})
+val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a"))
+val df2 = df.agg(sum(df("b")))
+
+df.cache()
+df.count()
+assertCached(df2)
+
+// udf has been evaluated during caching, and thus should not be 
re-evaluated here
+failAfter(5 seconds) {
+  df2.collect()
+}
+
+df.unpersist()
+assert(df.storageLevel == StorageLevel.NONE)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r27555 - in /dev/spark/2.4.0-SNAPSHOT-2018_06_19_00_01-9a75c18-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-06-19 Thread pwendell
Author: pwendell
Date: Tue Jun 19 07:17:56 2018
New Revision: 27555

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_06_19_00_01-9a75c18 docs


[This commit notification would consist of 1468 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org