spark git commit: [SPARK-21866][ML][PYTHON][FOLLOWUP] Few cleanups and fix image test failure in Python 3.6.0 / NumPy 1.13.3
Repository: spark Updated Branches: refs/heads/master ab6f60c4d -> 92cfbeeb5 [SPARK-21866][ML][PYTHON][FOLLOWUP] Few cleanups and fix image test failure in Python 3.6.0 / NumPy 1.13.3 ## What changes were proposed in this pull request? Image test seems failed in Python 3.6.0 / NumPy 1.13.3. I manually tested as below: ``` == ERROR: test_read_images (pyspark.ml.tests.ImageReaderTest) -- Traceback (most recent call last): File "/.../spark/python/pyspark/ml/tests.py", line 1831, in test_read_images self.assertEqual(ImageSchema.toImage(array, origin=first_row[0]), first_row) File "/.../spark/python/pyspark/ml/image.py", line 149, in toImage data = bytearray(array.astype(dtype=np.uint8).ravel()) TypeError: only integer scalar arrays can be converted to a scalar index -- Ran 1 test in 7.606s ``` To be clear, I think the error seems from NumPy - https://github.com/numpy/numpy/blob/75b2d5d427afdb1392f2a0b2092e0767e4bab53d/numpy/core/src/multiarray/number.c#L947 For a smaller scope: ```python >>> import numpy as np >>> bytearray(np.array([1]).astype(dtype=np.uint8)) Traceback (most recent call last): File "", line 1, in TypeError: only integer scalar arrays can be converted to a scalar index ``` In Python 2.7 / NumPy 1.13.1, it prints: ``` bytearray(b'\x01') ``` So, here, I simply worked around it by converting it to bytes as below: ```python >>> bytearray(np.array([1]).astype(dtype=np.uint8).tobytes()) bytearray(b'\x01') ``` Also, while looking into it again, I realised few arguments could be quite confusing, for example, `Row` that needs some specific attributes and `numpy.ndarray`. I added few type checking and added some tests accordingly. So, it shows an error message as below: ``` TypeError: array argument should be numpy.ndarray; however, it got []. ``` ## How was this patch tested? Manually tested with `./python/run-tests`. And also: ``` PYSPARK_PYTHON=python3 SPARK_TESTING=1 bin/pyspark pyspark.ml.tests ImageReaderTest ``` Author: hyukjinkwonCloses #19835 from HyukjinKwon/SPARK-21866-followup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92cfbeeb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92cfbeeb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92cfbeeb Branch: refs/heads/master Commit: 92cfbeeb5ce9e2c618a76b3fe60ce84b9d38605b Parents: ab6f60c Author: hyukjinkwon Authored: Thu Nov 30 10:26:55 2017 +0900 Committer: hyukjinkwon Committed: Thu Nov 30 10:26:55 2017 +0900 -- python/pyspark/ml/image.py | 27 --- python/pyspark/ml/tests.py | 20 +++- 2 files changed, 43 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/92cfbeeb/python/pyspark/ml/image.py -- diff --git a/python/pyspark/ml/image.py b/python/pyspark/ml/image.py index 7d14f05..2b61aa9 100644 --- a/python/pyspark/ml/image.py +++ b/python/pyspark/ml/image.py @@ -108,12 +108,23 @@ class _ImageSchema(object): """ Converts an image to an array with metadata. -:param image: The image to be converted. +:param `Row` image: A row that contains the image to be converted. It should +have the attributes specified in `ImageSchema.imageSchema`. :return: a `numpy.ndarray` that is an image. .. versionadded:: 2.3.0 """ +if not isinstance(image, Row): +raise TypeError( +"image argument should be pyspark.sql.types.Row; however, " +"it got [%s]." % type(image)) + +if any(not hasattr(image, f) for f in self.imageFields): +raise ValueError( +"image argument should have attributes specified in " +"ImageSchema.imageSchema [%s]." % ", ".join(self.imageFields)) + height = image.height width = image.width nChannels = image.nChannels @@ -127,15 +138,20 @@ class _ImageSchema(object): """ Converts an array with metadata to a two-dimensional image. -:param array array: The array to convert to image. +:param `numpy.ndarray` array: The array to convert to image. :param str origin: Path to the image, optional. :return: a :class:`Row` that is a two dimensional image. .. versionadded:: 2.3.0 """ +if not isinstance(array, np.ndarray): +raise TypeError( +"array argument should be
spark git commit: [SPARK-22585][CORE] Path in addJar is not url encoded
Repository: spark Updated Branches: refs/heads/master 8ff474f6e -> ab6f60c4d [SPARK-22585][CORE] Path in addJar is not url encoded ## What changes were proposed in this pull request? This updates a behavior of `addJar` method of `sparkContext` class. If path without any scheme is passed as input it is used literally without url encoding/decoding it. ## How was this patch tested? A unit test is added for this. Author: Jakub DubovskyCloses #19834 from james64/SPARK-22585-encode-add-jar. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab6f60c4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab6f60c4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab6f60c4 Branch: refs/heads/master Commit: ab6f60c4d6417cbb0240216a6b492aadcca3043e Parents: 8ff474f Author: Jakub Dubovsky Authored: Thu Nov 30 10:24:30 2017 +0900 Committer: hyukjinkwon Committed: Thu Nov 30 10:24:30 2017 +0900 -- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 +- .../test/scala/org/apache/spark/SparkContextSuite.scala | 11 +++ 2 files changed, 16 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ab6f60c4/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 984dd0a..c174939 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1837,7 +1837,11 @@ class SparkContext(config: SparkConf) extends Logging { Utils.validateURL(uri) uri.getScheme match { // A JAR file which exists only on the driver node - case null | "file" => addJarFile(new File(uri.getPath)) + case null => +// SPARK-22585 path without schema is not url encoded +addJarFile(new File(uri.getRawPath)) + // A JAR file which exists only on the driver node + case "file" => addJarFile(new File(uri.getPath)) // A JAR file which exists locally on every worker node case "local" => "file:" + uri.getPath case _ => path http://git-wip-us.apache.org/repos/asf/spark/blob/ab6f60c4/core/src/test/scala/org/apache/spark/SparkContextSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 0ed5f26..2bde875 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -309,6 +309,17 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(sc.listJars().head.contains(tmpJar.getName)) } + test("SPARK-22585 addJar argument without scheme is interpreted literally without url decoding") { +val tmpDir = new File(Utils.createTempDir(), "host%3A443") +tmpDir.mkdirs() +val tmpJar = File.createTempFile("t%2F", ".jar", tmpDir) + +sc = new SparkContext("local", "test") + +sc.addJar(tmpJar.getAbsolutePath) +assert(sc.listJars().size === 1) + } + test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") { try { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: [SPARK-20650][CORE] Remove JobProgressListener.
Repository: spark Updated Branches: refs/heads/master 193555f79 -> 8ff474f6e http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala deleted file mode 100644 index 48be3be..000 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ /dev/null @@ -1,442 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ui.jobs - -import java.util.Properties - -import org.scalatest.Matchers - -import org.apache.spark._ -import org.apache.spark.{LocalSparkContext, SparkConf, Success} -import org.apache.spark.executor._ -import org.apache.spark.scheduler._ -import org.apache.spark.ui.jobs.UIData.TaskUIData -import org.apache.spark.util.{AccumulatorContext, Utils} - -class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers { - - val jobSubmissionTime = 1421191042750L - val jobCompletionTime = 1421191296660L - - private def createStageStartEvent(stageId: Int) = { -val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, null, "") -SparkListenerStageSubmitted(stageInfo) - } - - private def createStageEndEvent(stageId: Int, failed: Boolean = false) = { -val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, null, "") -if (failed) { - stageInfo.failureReason = Some("Failed!") -} -SparkListenerStageCompleted(stageInfo) - } - - private def createJobStartEvent( - jobId: Int, - stageIds: Seq[Int], - jobGroup: Option[String] = None): SparkListenerJobStart = { -val stageInfos = stageIds.map { stageId => - new StageInfo(stageId, 0, stageId.toString, 0, null, null, "") -} -val properties: Option[Properties] = jobGroup.map { groupId => - val props = new Properties() - props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId) - props -} -SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos, properties.orNull) - } - - private def createJobEndEvent(jobId: Int, failed: Boolean = false) = { -val result = if (failed) JobFailed(new Exception("dummy failure")) else JobSucceeded -SparkListenerJobEnd(jobId, jobCompletionTime, result) - } - - private def runJob(listener: SparkListener, jobId: Int, shouldFail: Boolean = false) { -val stagesThatWontBeRun = jobId * 200 to jobId * 200 + 10 -val stageIds = jobId * 100 to jobId * 100 + 50 -listener.onJobStart(createJobStartEvent(jobId, stageIds ++ stagesThatWontBeRun)) -for (stageId <- stageIds) { - listener.onStageSubmitted(createStageStartEvent(stageId)) - listener.onStageCompleted(createStageEndEvent(stageId, failed = stageId % 2 == 0)) -} -listener.onJobEnd(createJobEndEvent(jobId, shouldFail)) - } - - private def assertActiveJobsStateIsEmpty(listener: JobProgressListener) { -listener.getSizesOfActiveStateTrackingCollections.foreach { case (fieldName, size) => - assert(size === 0, s"$fieldName was not empty") -} - } - - test("test LRU eviction of stages") { -def runWithListener(listener: JobProgressListener) : Unit = { - for (i <- 1 to 50) { -listener.onStageSubmitted(createStageStartEvent(i)) -listener.onStageCompleted(createStageEndEvent(i)) - } - assertActiveJobsStateIsEmpty(listener) -} -val conf = new SparkConf() -conf.set("spark.ui.retainedStages", 5.toString) -var listener = new JobProgressListener(conf) - -// Test with 5 retainedStages -runWithListener(listener) -listener.completedStages.size should be (5) -listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46)) - -// Test with 0 retainedStages -conf.set("spark.ui.retainedStages", 0.toString) -listener = new JobProgressListener(conf) -runWithListener(listener) -listener.completedStages.size should be (0) - } - - test("test clearing of stageIdToActiveJobs") { -val
[2/2] spark git commit: [SPARK-20650][CORE] Remove JobProgressListener.
[SPARK-20650][CORE] Remove JobProgressListener. The only remaining use of this class was the SparkStatusTracker, which was modified to use the new status store. The test code to wait for executors was moved to TestUtils and now uses the SparkStatusTracker API. Indirectly, ConsoleProgressBar also uses this data. Because it has some lower latency requirements, a shortcut to efficiently get the active stages from the active listener was added to the AppStateStore. Now that all UI code goes through the status store to get its data, the FsHistoryProvider can be cleaned up to only replay event logs when needed - that is, when there is no pre-existing disk store for the application. As part of this change I also modified the streaming UI to read the needed data from the store, which was missed in the previous patch that made JobProgressListener redundant. Author: Marcelo VanzinCloses #19750 from vanzin/SPARK-20650. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ff474f6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ff474f6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ff474f6 Branch: refs/heads/master Commit: 8ff474f6e543203fac5d49af7fbe98a8a98da567 Parents: 193555f Author: Marcelo Vanzin Authored: Wed Nov 29 14:34:41 2017 -0800 Committer: Marcelo Vanzin Committed: Wed Nov 29 14:34:41 2017 -0800 -- .../scala/org/apache/spark/SparkContext.scala | 11 +- .../org/apache/spark/SparkStatusTracker.scala | 76 ++- .../main/scala/org/apache/spark/TestUtils.scala | 26 +- .../deploy/history/FsHistoryProvider.scala | 65 +- .../apache/spark/status/AppStatusListener.scala | 51 +- .../apache/spark/status/AppStatusStore.scala| 17 +- .../org/apache/spark/status/LiveEntity.scala| 8 +- .../spark/status/api/v1/StagesResource.scala| 1 - .../apache/spark/ui/ConsoleProgressBar.scala| 18 +- .../spark/ui/jobs/JobProgressListener.scala | 612 --- .../org/apache/spark/ui/jobs/StagePage.scala| 1 - .../scala/org/apache/spark/ui/jobs/UIData.scala | 311 -- .../org/apache/spark/DistributedSuite.scala | 2 +- .../spark/ExternalShuffleServiceSuite.scala | 2 +- .../org/apache/spark/StatusTrackerSuite.scala | 6 +- .../apache/spark/broadcast/BroadcastSuite.scala | 2 +- .../spark/scheduler/DAGSchedulerSuite.scala | 4 +- .../SparkListenerWithClusterSuite.scala | 4 +- .../ui/jobs/JobProgressListenerSuite.scala | 442 -- project/MimaExcludes.scala | 2 + .../apache/spark/streaming/ui/BatchPage.scala | 75 +-- 21 files changed, 208 insertions(+), 1528 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8ff474f6/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 23fd54f..984dd0a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -58,7 +58,6 @@ import org.apache.spark.status.{AppStatusPlugin, AppStatusStore} import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump import org.apache.spark.ui.{ConsoleProgressBar, SparkUI} -import org.apache.spark.ui.jobs.JobProgressListener import org.apache.spark.util._ /** @@ -195,7 +194,6 @@ class SparkContext(config: SparkConf) extends Logging { private var _eventLogCodec: Option[String] = None private var _listenerBus: LiveListenerBus = _ private var _env: SparkEnv = _ - private var _jobProgressListener: JobProgressListener = _ private var _statusTracker: SparkStatusTracker = _ private var _progressBar: Option[ConsoleProgressBar] = None private var _ui: Option[SparkUI] = None @@ -270,8 +268,6 @@ class SparkContext(config: SparkConf) extends Logging { val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]() map.asScala } - private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener - def statusTracker: SparkStatusTracker = _statusTracker private[spark] def progressBar: Option[ConsoleProgressBar] = _progressBar @@ -421,11 +417,6 @@ class SparkContext(config: SparkConf) extends Logging { _listenerBus = new LiveListenerBus(_conf) -// "_jobProgressListener" should be set up before creating SparkEnv because when creating -// "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them. -_jobProgressListener = new JobProgressListener(_conf) -
spark git commit: [SPARK-18935][MESOS] Fix dynamic reservations on mesos
Repository: spark Updated Branches: refs/heads/master 284836862 -> 193555f79 [SPARK-18935][MESOS] Fix dynamic reservations on mesos ## What changes were proposed in this pull request? - Solves the issue described in the ticket by preserving reservation and allocation info in all cases (port handling included). - upgrades to 1.4 - Adds extra debug level logging to make debugging easier in the future, for example we add reservation info when applicable. ``` 17/09/29 14:53:07 DEBUG MesosCoarseGrainedSchedulerBackend: Accepting offer: f20de49b-dee3-45dd-a3c1-73418b7de891-O32 with attributes: Map() allocation info: role: "spark-prive" reservation info: name: "ports" type: RANGES ranges { range { begin: 31000 end: 32000 } } role: "spark-prive" reservation { principal: "test" } allocation_info { role: "spark-prive" } ``` - Some style cleanup. ## How was this patch tested? Manually by running the example in the ticket with and without a principal. Specifically I tested it on a dc/os 1.10 cluster with 7 nodes and played with reservations. From the master node in order to reserve resources I executed: ```for i in 0 1 2 3 4 5 6 do curl -i \ -d slaveId=90ec65ea-1f7b-479f-a824-35d2527d6d26-S$i \ -d resources='[ { "name": "cpus", "type": "SCALAR", "scalar": { "value": 2 }, "role": "spark-role", "reservation": { "principal": "" } }, { "name": "mem", "type": "SCALAR", "scalar": { "value": 8026 }, "role": "spark-role", "reservation": { "principal": "" } } ]' \ -X POST http://master.mesos:5050/master/reserve done ``` Nodes had 4 cpus (m3.xlarge instances) and I reserved either 2 or 4 cpus (all for a role). I verified it launches tasks on nodes with reserved resources under `spark-role` role only if a) there are remaining resources for (*) default role and the spark driver has no role assigned to it. b) the spark driver has a role assigned to it and it is the same role used in reservations. I also tested this locally on my machine. Author: Stavros KontopoulosCloses #19390 from skonto/fix_dynamic_reservation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/193555f7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/193555f7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/193555f7 Branch: refs/heads/master Commit: 193555f79cc73873613674a09a7c371688b6dbc7 Parents: 2848368 Author: Stavros Kontopoulos Authored: Wed Nov 29 14:15:35 2017 -0800 Committer: Marcelo Vanzin Committed: Wed Nov 29 14:15:35 2017 -0800 -- dev/deps/spark-deps-hadoop-2.6 | 2 +- dev/deps/spark-deps-hadoop-2.7 | 2 +- resource-managers/mesos/pom.xml | 2 +- .../cluster/mesos/MesosClusterScheduler.scala | 1 - .../MesosCoarseGrainedSchedulerBackend.scala| 17 +++- .../cluster/mesos/MesosSchedulerUtils.scala | 99 +--- 6 files changed, 80 insertions(+), 43 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/dev/deps/spark-deps-hadoop-2.6 -- diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 21c8a75..50ac6d1 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -138,7 +138,7 @@ lz4-java-1.4.0.jar machinist_2.11-0.6.1.jar macro-compat_2.11-1.1.1.jar mail-1.4.7.jar -mesos-1.3.0-shaded-protobuf.jar +mesos-1.4.0-shaded-protobuf.jar metrics-core-3.1.5.jar metrics-graphite-3.1.5.jar metrics-json-3.1.5.jar http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/dev/deps/spark-deps-hadoop-2.7 -- diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index 7173426..1b1e316 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -139,7 +139,7 @@ lz4-java-1.4.0.jar machinist_2.11-0.6.1.jar macro-compat_2.11-1.1.1.jar mail-1.4.7.jar -mesos-1.3.0-shaded-protobuf.jar +mesos-1.4.0-shaded-protobuf.jar metrics-core-3.1.5.jar metrics-graphite-3.1.5.jar metrics-json-3.1.5.jar http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/resource-managers/mesos/pom.xml -- diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml index de8f1c9..70d0c17 100644 --- a/resource-managers/mesos/pom.xml +++ b/resource-managers/mesos/pom.xml @@ -29,7 +29,7 @@ Spark Project Mesos mesos
spark git commit: [SPARK-22608][SQL] add new API to CodeGeneration.splitExpressions()
Repository: spark Updated Branches: refs/heads/master 57687280d -> 284836862 [SPARK-22608][SQL] add new API to CodeGeneration.splitExpressions() ## What changes were proposed in this pull request? This PR adds a new API to ` CodeGenenerator.splitExpression` since since several ` CodeGenenerator.splitExpression` are used with `ctx.INPUT_ROW` to avoid code duplication. ## How was this patch tested? Used existing test suits Author: Kazuaki IshizakiCloses #19821 from kiszk/SPARK-22608. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28483686 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28483686 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28483686 Branch: refs/heads/master Commit: 284836862b2312aea5d7555c8e3c9d3c4dbc8eaf Parents: 5768728 Author: Kazuaki Ishizaki Authored: Thu Nov 30 01:19:37 2017 +0800 Committer: Wenchen Fan Committed: Thu Nov 30 01:19:37 2017 +0800 -- .../spark/sql/catalyst/expressions/Cast.scala | 2 +- .../expressions/codegen/CodeGenerator.scala | 24 +-- .../sql/catalyst/expressions/predicates.scala | 10 +++--- .../expressions/stringExpressions.scala | 32 ++-- 4 files changed, 37 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/28483686/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 12baddf..8cafaef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -1040,7 +1040,7 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String } """ } -val fieldsEvalCodes = if (ctx.INPUT_ROW != null && ctx.currentVars == null) { +val fieldsEvalCodes = if (ctx.currentVars == null) { ctx.splitExpressions( expressions = fieldsEvalCode, funcName = "castStruct", http://git-wip-us.apache.org/repos/asf/spark/blob/28483686/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 668c816..1645db1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -788,11 +788,31 @@ class CodegenContext { * @param expressions the codes to evaluate expressions. */ def splitExpressions(expressions: Seq[String]): String = { +splitExpressions(expressions, funcName = "apply", extraArguments = Nil) + } + + /** + * Similar to [[splitExpressions(expressions: Seq[String])]], but has customized function name + * and extra arguments. + * + * @param expressions the codes to evaluate expressions. + * @param funcName the split function name base. + * @param extraArguments the list of (type, name) of the arguments of the split function + * except for ctx.INPUT_ROW + */ + def splitExpressions( + expressions: Seq[String], + funcName: String, + extraArguments: Seq[(String, String)]): String = { // TODO: support whole stage codegen if (INPUT_ROW == null || currentVars != null) { - return expressions.mkString("\n") + expressions.mkString("\n") +} else { + splitExpressions( +expressions, +funcName, +arguments = ("InternalRow", INPUT_ROW) +: extraArguments) } -splitExpressions(expressions, funcName = "apply", arguments = ("InternalRow", INPUT_ROW) :: Nil) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/28483686/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index eb74753..1f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++
spark git commit: [SPARK-22615][SQL] Handle more cases in PropagateEmptyRelation
Repository: spark Updated Branches: refs/heads/master 20b239845 -> 57687280d [SPARK-22615][SQL] Handle more cases in PropagateEmptyRelation ## What changes were proposed in this pull request? Currently, in the optimize rule `PropagateEmptyRelation`, the following cases is not handled: 1. empty relation as right child in left outer join 2. empty relation as left child in right outer join 3. empty relation as right child in left semi join 4. empty relation as right child in left anti join 5. only one empty relation in full outer join case 1 / 2 / 5 can be treated as **Cartesian product** and cause exception. See the new test cases. ## How was this patch tested? Unit test Author: Wang GengliangCloses #19825 from gengliangwang/SPARK-22615. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/57687280 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/57687280 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/57687280 Branch: refs/heads/master Commit: 57687280d4171db98d4d9404c7bd3374f51deac0 Parents: 20b2398 Author: Wang Gengliang Authored: Wed Nov 29 09:17:39 2017 -0800 Committer: gatorsmile Committed: Wed Nov 29 09:17:39 2017 -0800 -- .../optimizer/PropagateEmptyRelation.scala | 36 +++- .../optimizer/PropagateEmptyRelationSuite.scala | 16 +- .../sql-tests/inputs/join-empty-relation.sql| 28 +++ .../results/join-empty-relation.sql.out | 194 +++ 4 files changed, 257 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/57687280/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 52fbb4d..a6e5aa6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -41,6 +41,10 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper { private def empty(plan: LogicalPlan) = LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming) + // Construct a project list from plan's output, while the value is always NULL. + private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] = +plan.output.map{ a => Alias(Literal(null), a.name)(a.exprId) } + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case p: Union if p.children.forall(isEmptyLocalRelation) => empty(p) @@ -49,16 +53,28 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper { // as stateful streaming joins need to perform other state management operations other than // just processing the input data. case p @ Join(_, _, joinType, _) -if !p.children.exists(_.isStreaming) && p.children.exists(isEmptyLocalRelation) => - joinType match { -case _: InnerLike => empty(p) -// Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule. -// Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule. -case LeftOuter | LeftSemi | LeftAnti if isEmptyLocalRelation(p.left) => empty(p) -case RightOuter if isEmptyLocalRelation(p.right) => empty(p) -case FullOuter if p.children.forall(isEmptyLocalRelation) => empty(p) -case _ => p -} +if !p.children.exists(_.isStreaming) => + val isLeftEmpty = isEmptyLocalRelation(p.left) + val isRightEmpty = isEmptyLocalRelation(p.right) + if (isLeftEmpty || isRightEmpty) { +joinType match { + case _: InnerLike => empty(p) + // Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule. + // Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule. + case LeftOuter | LeftSemi | LeftAnti if isLeftEmpty => empty(p) + case LeftSemi if isRightEmpty => empty(p) + case LeftAnti if isRightEmpty => p.left + case FullOuter if isLeftEmpty && isRightEmpty => empty(p) + case LeftOuter | FullOuter if isRightEmpty => +Project(p.left.output ++ nullValueProjectList(p.right), p.left) + case RightOuter if isRightEmpty => empty(p) + case RightOuter | FullOuter if isLeftEmpty => +Project(nullValueProjectList(p.left) ++ p.right.output, p.right) + case _ => p +} + } else { +p + } case p:
spark git commit: [SPARK-22605][SQL] SQL write job should also set Spark task output metrics
Repository: spark Updated Branches: refs/heads/master e9b2070ab -> 20b239845 [SPARK-22605][SQL] SQL write job should also set Spark task output metrics ## What changes were proposed in this pull request? For SQL write jobs, we only set metrics for the SQL listener and display them in the SQL plan UI. We should also set metrics for Spark task output metrics, which will be shown in spark job UI. ## How was this patch tested? test it manually. For a simple write job ``` spark.range(1000).write.parquet("/tmp/p1") ``` now the spark job UI looks like ![ui](https://user-images.githubusercontent.com/3182036/33326478-05a25b7c-d490-11e7-96ef-806117774356.jpg) Author: Wenchen FanCloses #19833 from cloud-fan/ui. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20b23984 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20b23984 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20b23984 Branch: refs/heads/master Commit: 20b239845b695fe6a893ebfe97b49ef05fae773d Parents: e9b2070 Author: Wenchen Fan Authored: Wed Nov 29 19:18:47 2017 +0800 Committer: Wenchen Fan Committed: Wed Nov 29 19:18:47 2017 +0800 -- .../execution/datasources/BasicWriteStatsTracker.scala| 10 -- 1 file changed, 8 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/20b23984/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala index 11af0aa..9dbbe99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala @@ -22,7 +22,7 @@ import java.io.FileNotFoundException import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.SparkContext +import org.apache.spark.{SparkContext, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.SQLExecution @@ -44,7 +44,6 @@ case class BasicWriteTaskStats( /** * Simple [[WriteTaskStatsTracker]] implementation that produces [[BasicWriteTaskStats]]. - * @param hadoopConf */ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) extends WriteTaskStatsTracker with Logging { @@ -106,6 +105,13 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) override def getFinalStats(): WriteTaskStats = { statCurrentFile() + +// Reports bytesWritten and recordsWritten to the Spark output metrics. +Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { outputMetrics => + outputMetrics.setBytesWritten(numBytes) + outputMetrics.setRecordsWritten(numRows) +} + if (submittedFiles != numFiles) { logInfo(s"Expected $submittedFiles files, but only saw $numFiles. " + "This could be due to the output format not writing empty files, " + - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org