spark git commit: [SPARK-23778][CORE] Avoid unneeded shuffle when union gets an empty RDD
Repository: spark Updated Branches: refs/heads/master bc0498d58 -> bc111463a [SPARK-23778][CORE] Avoid unneeded shuffle when union gets an empty RDD ## What changes were proposed in this pull request? When a `union` is invoked on several RDDs of which one is an empty RDD, the result of the operation is a `UnionRDD`. This causes an unneeded extra-shuffle when all the other RDDs have the same partitioning. The PR ignores incoming empty RDDs in the union method. ## How was this patch tested? added UT Author: Marco Gaido Closes #21333 from mgaido91/SPARK-23778. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc111463 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc111463 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc111463 Branch: refs/heads/master Commit: bc111463a766a5619966a282fbe0fec991088ceb Parents: bc0498d Author: Marco Gaido Authored: Tue Jun 19 22:29:00 2018 -0700 Committer: Wenchen Fan Committed: Tue Jun 19 22:29:00 2018 -0700 -- .../main/scala/org/apache/spark/SparkContext.scala| 9 + .../test/scala/org/apache/spark/rdd/RDDSuite.scala| 14 +- 2 files changed, 18 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bc111463/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5e85956..74bfb5d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1306,11 +1306,12 @@ class SparkContext(config: SparkConf) extends Logging { /** Build the union of a list of RDDs. */ def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope { -val partitioners = rdds.flatMap(_.partitioner).toSet -if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) { - new PartitionerAwareUnionRDD(this, rdds) +val nonEmptyRdds = rdds.filter(!_.partitions.isEmpty) +val partitioners = nonEmptyRdds.flatMap(_.partitioner).toSet +if (nonEmptyRdds.forall(_.partitioner.isDefined) && partitioners.size == 1) { + new PartitionerAwareUnionRDD(this, nonEmptyRdds) } else { - new UnionRDD(this, rdds) + new UnionRDD(this, nonEmptyRdds) } } http://git-wip-us.apache.org/repos/asf/spark/blob/bc111463/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 191c612..5148ce0 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -154,6 +154,16 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { } } + test("SPARK-23778: empty RDD in union should not produce a UnionRDD") { +val rddWithPartitioner = sc.parallelize(Seq(1 -> true)).partitionBy(new HashPartitioner(1)) +val emptyRDD = sc.emptyRDD[(Int, Boolean)] +val unionRDD = sc.union(emptyRDD, rddWithPartitioner) +assert(unionRDD.isInstanceOf[PartitionerAwareUnionRDD[_]]) +val unionAllEmptyRDD = sc.union(emptyRDD, emptyRDD) +assert(unionAllEmptyRDD.isInstanceOf[UnionRDD[_]]) +assert(unionAllEmptyRDD.collect().isEmpty) + } + test("partitioner aware union") { def makeRDDWithPartitioner(seq: Seq[Int]): RDD[Int] = { sc.makeRDD(seq, 1) @@ -1047,7 +1057,9 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { private class CyclicalDependencyRDD[T: ClassTag] extends RDD[T](sc, Nil) { private val mutableDependencies: ArrayBuffer[Dependency[_]] = ArrayBuffer.empty override def compute(p: Partition, c: TaskContext): Iterator[T] = Iterator.empty -override def getPartitions: Array[Partition] = Array.empty +override def getPartitions: Array[Partition] = Array(new Partition { + override def index: Int = 0 +}) override def getDependencies: Seq[Dependency[_]] = mutableDependencies def addDependency(dep: Dependency[_]) { mutableDependencies += dep - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r27571 - in /dev/spark/2.3.2-SNAPSHOT-2018_06_19_18_01-d687d97-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jun 20 01:16:54 2018 New Revision: 27571 Log: Apache Spark 2.3.2-SNAPSHOT-2018_06_19_18_01-d687d97 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r27569 - in /dev/spark/2.4.0-SNAPSHOT-2018_06_19_16_01-bc0498d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jun 19 23:15:41 2018 New Revision: 27569 Log: Apache Spark 2.4.0-SNAPSHOT-2018_06_19_16_01-bc0498d docs [This commit notification would consist of 1468 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24583][SQL] Wrong schema type in InsertIntoDataSourceCommand
Repository: spark Updated Branches: refs/heads/master 2cb976355 -> bc0498d58 [SPARK-24583][SQL] Wrong schema type in InsertIntoDataSourceCommand ## What changes were proposed in this pull request? Change insert input schema type: "insertRelationType" -> "insertRelationType.asNullable", in order to avoid nullable being overridden. ## How was this patch tested? Added one test in InsertSuite. Author: Maryann Xue Closes #21585 from maryannxue/spark-24583. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc0498d5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc0498d5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc0498d5 Branch: refs/heads/master Commit: bc0498d5820ded2b428277e396502e74ef0ce36d Parents: 2cb9763 Author: Maryann Xue Authored: Tue Jun 19 15:27:20 2018 -0700 Committer: Xiao Li Committed: Tue Jun 19 15:27:20 2018 -0700 -- .../InsertIntoDataSourceCommand.scala | 5 +- .../apache/spark/sql/sources/InsertSuite.scala | 51 +++- 2 files changed, 52 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bc0498d5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala index a813829..80d7608 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala @@ -38,9 +38,8 @@ case class InsertIntoDataSourceCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val relation = logicalRelation.relation.asInstanceOf[InsertableRelation] val data = Dataset.ofRows(sparkSession, query) -// Apply the schema of the existing table to the new data. -val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema) -relation.insert(df, overwrite) +// Data has been casted to the target relation's schema by the PreprocessTableInsertion rule. +relation.insert(data, overwrite) // Re-cache all cached plans(including this relation itself, if it's cached) that refer to this // data source relation. http://git-wip-us.apache.org/repos/asf/spark/blob/bc0498d5/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index fef01c8..438d5d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -20,12 +20,36 @@ package org.apache.spark.sql.sources import java.io.File import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +class SimpleInsertSource extends SchemaRelationProvider { + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { +SimpleInsert(schema)(sqlContext.sparkSession) + } +} + +case class SimpleInsert(userSpecifiedSchema: StructType)(@transient val sparkSession: SparkSession) + extends BaseRelation with InsertableRelation { + + override def sqlContext: SQLContext = sparkSession.sqlContext + + override def schema: StructType = userSpecifiedSchema + + override def insert(input: DataFrame, overwrite: Boolean): Unit = { +input.collect + } +} + class InsertSuite extends DataSourceTest with SharedSQLContext { import testImplicits._ @@ -520,4 +544,29 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { } } } + + test("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") { +withTable("test_table") { + val schema = new StructType() +.add("i", LongType, false) +.add("s", StringType, false) + val newTable = CatalogTable( +identifier = TableIdentifier("test_table",
spark git commit: [SPARK-24583][SQL] Wrong schema type in InsertIntoDataSourceCommand
Repository: spark Updated Branches: refs/heads/branch-2.3 50cdb4138 -> d687d97b1 [SPARK-24583][SQL] Wrong schema type in InsertIntoDataSourceCommand ## What changes were proposed in this pull request? Change insert input schema type: "insertRelationType" -> "insertRelationType.asNullable", in order to avoid nullable being overridden. ## How was this patch tested? Added one test in InsertSuite. Author: Maryann Xue Closes #21585 from maryannxue/spark-24583. (cherry picked from commit bc0498d5820ded2b428277e396502e74ef0ce36d) Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d687d97b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d687d97b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d687d97b Branch: refs/heads/branch-2.3 Commit: d687d97b116beafa7f4375f1876049f5da4f5ba7 Parents: 50cdb41 Author: Maryann Xue Authored: Tue Jun 19 15:27:20 2018 -0700 Committer: Xiao Li Committed: Tue Jun 19 15:27:30 2018 -0700 -- .../InsertIntoDataSourceCommand.scala | 5 +- .../apache/spark/sql/sources/InsertSuite.scala | 51 +++- 2 files changed, 52 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d687d97b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala index a813829..80d7608 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala @@ -38,9 +38,8 @@ case class InsertIntoDataSourceCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val relation = logicalRelation.relation.asInstanceOf[InsertableRelation] val data = Dataset.ofRows(sparkSession, query) -// Apply the schema of the existing table to the new data. -val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema) -relation.insert(df, overwrite) +// Data has been casted to the target relation's schema by the PreprocessTableInsertion rule. +relation.insert(data, overwrite) // Re-cache all cached plans(including this relation itself, if it's cached) that refer to this // data source relation. http://git-wip-us.apache.org/repos/asf/spark/blob/d687d97b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index fef01c8..438d5d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -20,12 +20,36 @@ package org.apache.spark.sql.sources import java.io.File import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +class SimpleInsertSource extends SchemaRelationProvider { + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { +SimpleInsert(schema)(sqlContext.sparkSession) + } +} + +case class SimpleInsert(userSpecifiedSchema: StructType)(@transient val sparkSession: SparkSession) + extends BaseRelation with InsertableRelation { + + override def sqlContext: SQLContext = sparkSession.sqlContext + + override def schema: StructType = userSpecifiedSchema + + override def insert(input: DataFrame, overwrite: Boolean): Unit = { +input.collect + } +} + class InsertSuite extends DataSourceTest with SharedSQLContext { import testImplicits._ @@ -520,4 +544,29 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { } } } + + test("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") { +withTable("test_table") { + val schema = new StructType() +.add("i", LongType, false) +.add("s",
spark git commit: [SPARK-24565][SS] Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame
Repository: spark Updated Branches: refs/heads/master 13092d733 -> 2cb976355 [SPARK-24565][SS] Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame ## What changes were proposed in this pull request? Currently, the micro-batches in the MicroBatchExecution is not exposed to the user through any public API. This was because we did not want to expose the micro-batches, so that all the APIs we expose, we can eventually support them in the Continuous engine. But now that we have better sense of buiding a ContinuousExecution, I am considering adding APIs which will run only the MicroBatchExecution. I have quite a few use cases where exposing the microbatch output as a dataframe is useful. - Pass the output rows of each batch to a library that is designed only the batch jobs (example, uses many ML libraries need to collect() while learning). - Reuse batch data sources for output whose streaming version does not exists (e.g. redshift data source). - Writer the output rows to multiple places by writing twice for each batch. This is not the most elegant thing to do for multiple-output streaming queries but is likely to be better than running two streaming queries processing the same data twice. The proposal is to add a method `foreachBatch(f: Dataset[T] => Unit)` to Scala/Java/Python `DataStreamWriter`. ## How was this patch tested? New unit tests. Author: Tathagata Das Closes #21571 from tdas/foreachBatch. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2cb97635 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2cb97635 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2cb97635 Branch: refs/heads/master Commit: 2cb976355c615eee4ebd0a86f3911fa9284fccf6 Parents: 13092d7 Author: Tathagata Das Authored: Tue Jun 19 13:56:51 2018 -0700 Committer: Shixiong Zhu Committed: Tue Jun 19 13:56:51 2018 -0700 -- python/pyspark/java_gateway.py | 25 +++- python/pyspark/sql/streaming.py | 33 - python/pyspark/sql/tests.py | 36 + python/pyspark/sql/utils.py | 23 +++ python/pyspark/streaming/context.py | 18 +-- .../streaming/sources/ForeachBatchSink.scala| 58 .../spark/sql/streaming/DataStreamWriter.scala | 63 +++- .../sources/ForeachBatchSinkSuite.scala | 148 +++ 8 files changed, 383 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2cb97635/python/pyspark/java_gateway.py -- diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 0afbe9d..fa2d5e8 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -31,7 +31,7 @@ from subprocess import Popen, PIPE if sys.version >= '3': xrange = range -from py4j.java_gateway import java_import, JavaGateway, GatewayParameters +from py4j.java_gateway import java_import, JavaGateway, JavaObject, GatewayParameters from pyspark.find_spark_home import _find_spark_home from pyspark.serializers import read_int, write_with_length, UTF8Deserializer @@ -145,3 +145,26 @@ def do_server_auth(conn, auth_secret): if reply != "ok": conn.close() raise Exception("Unexpected reply from iterator server.") + + +def ensure_callback_server_started(gw): +""" +Start callback server if not already started. The callback server is needed if the Java +driver process needs to callback into the Python driver process to execute Python code. +""" + +# getattr will fallback to JVM, so we cannot test by hasattr() +if "_callback_server" not in gw.__dict__ or gw._callback_server is None: +gw.callback_server_parameters.eager_load = True +gw.callback_server_parameters.daemonize = True +gw.callback_server_parameters.daemonize_connections = True +gw.callback_server_parameters.port = 0 +gw.start_callback_server(gw.callback_server_parameters) +cbport = gw._callback_server.server_socket.getsockname()[1] +gw._callback_server.port = cbport +# gateway with real port +gw._python_proxy_port = gw._callback_server.port +# get the GatewayServer object in JVM by ID +jgws = JavaObject("GATEWAY_SERVER", gw._gateway_client) +# update the port of CallbackClient with real port +jgws.resetCallbackClient(jgws.getCallbackClient().getAddress(), gw._python_proxy_port) http://git-wip-us.apache.org/repos/asf/spark/blob/2cb97635/python/pyspark/sql/streaming.py -- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
spark git commit: [SPARK-24534][K8S] Bypass non spark-on-k8s commands
Repository: spark Updated Branches: refs/heads/master 9dbe53eb6 -> 13092d733 [SPARK-24534][K8S] Bypass non spark-on-k8s commands ## What changes were proposed in this pull request? This PR changes the entrypoint.sh to provide an option to run non spark-on-k8s commands (init, driver, executor) in order to let the user keep with the normal workflow without hacking the image to bypass the entrypoint ## How was this patch tested? This patch was built manually in my local machine and I ran some tests with a combination of ```docker run``` commands. Author: rimolive Closes #21572 from rimolive/rimolive-spark-24534. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13092d73 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13092d73 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13092d73 Branch: refs/heads/master Commit: 13092d733791b19cd7994084178306e0c449f2ed Parents: 9dbe53e Author: rimolive Authored: Tue Jun 19 13:25:00 2018 -0700 Committer: Erik Erlandson Committed: Tue Jun 19 13:25:00 2018 -0700 -- .../src/main/dockerfiles/spark/entrypoint.sh | 17 +++-- 1 file changed, 11 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/13092d73/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh -- diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index acdb4b1..2f4e115 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -37,11 +37,17 @@ if [ -z "$uidentry" ] ; then fi SPARK_K8S_CMD="$1" -if [ -z "$SPARK_K8S_CMD" ]; then - echo "No command to execute has been provided." 1>&2 - exit 1 -fi -shift 1 +case "$SPARK_K8S_CMD" in +driver | driver-py | executor) + shift 1 + ;; +"") + ;; +*) + echo "Non-spark-on-k8s command provided, proceeding in pass-through mode..." + exec /sbin/tini -s -- "$@" + ;; +esac SPARK_CLASSPATH="$SPARK_CLASSPATH:${SPARK_HOME}/jars/*" env | grep SPARK_JAVA_OPT_ | sort -t_ -k4 -n | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt @@ -92,7 +98,6 @@ case "$SPARK_K8S_CMD" in "$@" $PYSPARK_PRIMARY $PYSPARK_ARGS ) ;; - executor) CMD=( ${JAVA_HOME}/bin/java - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r27566 - in /dev/spark/2.4.0-SNAPSHOT-2018_06_19_12_02-9dbe53e-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jun 19 19:17:17 2018 New Revision: 27566 Log: Apache Spark 2.4.0-SNAPSHOT-2018_06_19_12_02-9dbe53e docs [This commit notification would consist of 1468 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24556][SQL] Always rewrite output partitioning in ReusedExchangeExec and InMemoryTableScanExec
Repository: spark Updated Branches: refs/heads/master a78a90464 -> 9dbe53eb6 [SPARK-24556][SQL] Always rewrite output partitioning in ReusedExchangeExec and InMemoryTableScanExec ## What changes were proposed in this pull request? Currently, ReusedExchange and InMemoryTableScanExec only rewrite output partitioning if child's partitioning is HashPartitioning and do nothing for other partitioning, e.g., RangePartitioning. We should always rewrite it, otherwise, unnecessary shuffle could be introduced like https://issues.apache.org/jira/browse/SPARK-24556. ## How was this patch tested? Add new tests. Author: yucai Closes #21564 from yucai/SPARK-24556. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9dbe53eb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9dbe53eb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9dbe53eb Branch: refs/heads/master Commit: 9dbe53eb6bb5916d28000f2c0d646cf23094ac11 Parents: a78a904 Author: yucai Authored: Tue Jun 19 10:52:51 2018 -0700 Committer: Wenchen Fan Committed: Tue Jun 19 10:52:51 2018 -0700 -- .../columnar/InMemoryTableScanExec.scala| 6 +- .../spark/sql/execution/exchange/Exchange.scala | 4 +- .../spark/sql/execution/PlannerSuite.scala | 64 +++- 3 files changed, 67 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9dbe53eb/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 0b4dd76..997cf92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, SparkPlan, WholeStageCodegenExec} import org.apache.spark.sql.execution.vectorized._ import org.apache.spark.sql.types._ @@ -169,8 +169,8 @@ case class InMemoryTableScanExec( // But the cached version could alias output, so we need to replace output. override def outputPartitioning: Partitioning = { relation.cachedPlan.outputPartitioning match { - case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning] - case _ => relation.cachedPlan.outputPartitioning + case e: Expression => updateAttribute(e).asInstanceOf[Partitioning] + case other => other } } http://git-wip-us.apache.org/repos/asf/spark/blob/9dbe53eb/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala index 09f79a2..1a5b759 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala @@ -24,7 +24,7 @@ import org.apache.spark.broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Expression, SortOrder} -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, UnaryExecNode} import org.apache.spark.sql.internal.SQLConf @@ -70,7 +70,7 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan } override def outputPartitioning: Partitioning = child.outputPartitioning match { -case h: HashPartitioning => h.copy(expressions = h.expressions.map(updateAttr)) +case e: Expression => updateAttr(e).asInstanceOf[Partitioning] case other => other } http://git-wip-us.apache.org/repos/asf/spark/blob/9dbe53eb/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala -- diff
spark git commit: [SPARK-24521][SQL][TEST] Fix ineffective test in CachedTableSuite
Repository: spark Updated Branches: refs/heads/master 9a75c1829 -> a78a90464 [SPARK-24521][SQL][TEST] Fix ineffective test in CachedTableSuite ## What changes were proposed in this pull request? test("withColumn doesn't invalidate cached dataframe") in CachedTableSuite doesn't not work because: The UDF is executed and test count incremented when "df.cache()" is called and the subsequent "df.collect()" has no effect on the test result. This PR fixed this test and add another test for caching UDF. ## How was this patch tested? Add new tests. Author: Li Jin Closes #21531 from icexelloss/fix-cache-test. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a78a9046 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a78a9046 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a78a9046 Branch: refs/heads/master Commit: a78a9046413255756653f70165520efd486fb493 Parents: 9a75c18 Author: Li Jin Authored: Tue Jun 19 10:42:08 2018 -0700 Committer: Xiao Li Committed: Tue Jun 19 10:42:08 2018 -0700 -- .../org/apache/spark/sql/CachedTableSuite.scala | 19 -- .../apache/spark/sql/DatasetCacheSuite.scala| 38 +++- 2 files changed, 37 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a78a9046/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 81b7e18..6982c22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -83,25 +83,6 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext }.sum } - test("withColumn doesn't invalidate cached dataframe") { -var evalCount = 0 -val myUDF = udf((x: String) => { evalCount += 1; "result" }) -val df = Seq(("test", 1)).toDF("s", "i").select(myUDF($"s")) -df.cache() - -df.collect() -assert(evalCount === 1) - -df.collect() -assert(evalCount === 1) - -val df2 = df.withColumn("newColumn", lit(1)) -df2.collect() - -// We should not reevaluate the cached dataframe -assert(evalCount === 1) - } - test("cache temp table") { withTempView("tempTable") { testData.select('key).createOrReplaceTempView("tempTable") http://git-wip-us.apache.org/repos/asf/spark/blob/a78a9046/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala index e0561ee..82a93f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala @@ -17,12 +17,15 @@ package org.apache.spark.sql +import org.scalatest.concurrent.TimeLimits +import org.scalatest.time.SpanSugar._ + import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.storage.StorageLevel -class DatasetCacheSuite extends QueryTest with SharedSQLContext { +class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits { import testImplicits._ test("get storage level") { @@ -96,4 +99,37 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext { agged.unpersist() assert(agged.storageLevel == StorageLevel.NONE, "The Dataset agged should not be cached.") } + + test("persist and then withColumn") { +val df = Seq(("test", 1)).toDF("s", "i") +val df2 = df.withColumn("newColumn", lit(1)) + +df.cache() +assertCached(df) +assertCached(df2) + +df.count() +assertCached(df2) + +df.unpersist() +assert(df.storageLevel == StorageLevel.NONE) + } + + test("cache UDF result correctly") { +val expensiveUDF = udf({x: Int => Thread.sleep(1); x}) +val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a")) +val df2 = df.agg(sum(df("b"))) + +df.cache() +df.count() +assertCached(df2) + +// udf has been evaluated during caching, and thus should not be re-evaluated here +failAfter(5 seconds) { + df2.collect() +} + +df.unpersist() +assert(df.storageLevel == StorageLevel.NONE) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r27555 - in /dev/spark/2.4.0-SNAPSHOT-2018_06_19_00_01-9a75c18-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jun 19 07:17:56 2018 New Revision: 27555 Log: Apache Spark 2.4.0-SNAPSHOT-2018_06_19_00_01-9a75c18 docs [This commit notification would consist of 1468 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org