[spark] branch branch-2.4 updated: [SPARK-29263][CORE][TEST][FOLLOWUP][2.4] Fix build failure of `TaskSchedulerImplSuite`
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 9ae7393 [SPARK-29263][CORE][TEST][FOLLOWUP][2.4] Fix build failure of `TaskSchedulerImplSuite` 9ae7393 is described below commit 9ae73932bb749bde6b71cbe6cf595ec2d23b60ea Author: Xingbo Jiang AuthorDate: Fri Sep 27 16:31:23 2019 -0700 [SPARK-29263][CORE][TEST][FOLLOWUP][2.4] Fix build failure of `TaskSchedulerImplSuite` ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/25946 Fixed a bug and modified the `TaskSchedulerImplSuite`, when backported to 2.4 it breaks the build. This PR is to fix the broken test build. ### How was this patch tested? Passed locally. Closes #25952 from jiangxb1987/SPARK-29263. Authored-by: Xingbo Jiang Signed-off-by: Marcelo Vanzin --- .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala| 10 +++--- .../scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 5c0601eb03..ecbb6ab 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -77,7 +77,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = { -val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite") +setupSchedulerWithMaster("local", confs: _*) + } + + def setupSchedulerWithMaster(master: String, confs: (String, String)*): TaskSchedulerImpl = { +val conf = new SparkConf().setMaster(master).setAppName("TaskSchedulerImplSuite") confs.foreach { case (k, v) => conf.set(k, v) } sc = new SparkContext(conf) taskScheduler = new TaskSchedulerImpl(sc) @@ -1129,7 +1133,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // you'd need the previous stage to also get restarted, and then succeed, in between each // attempt, but that happens outside what we're mocking here.) val zombieAttempts = (0 until 2).map { stageAttempt => - val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + val attempt = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = stageAttempt) taskScheduler.submitTasks(attempt) val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } @@ -1148,7 +1152,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for // the stage, but this time with insufficient resources so not all tasks are active. -val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +val finalAttempt = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = 2) taskScheduler.submitTasks(finalAttempt) val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index d264ada..93a4b1f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1398,7 +1398,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskSetManager1.isZombie) assert(taskSetManager1.runningTasks === 9) -val taskSet2 = FakeTask.createTaskSet(10, stageAttemptId = 1) +val taskSet2 = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = 1) sched.submitTasks(taskSet2) sched.resourceOffers( (11 until 20).map { idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27254][SS] Cleanup complete but invalid output files in ManifestFileCommitProtocol if job is aborted
This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d72f398 [SPARK-27254][SS] Cleanup complete but invalid output files in ManifestFileCommitProtocol if job is aborted d72f398 is described below commit d72f39897b00d0bbd7a4db9de281a1256fcf908d Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Fri Sep 27 12:35:26 2019 -0700 [SPARK-27254][SS] Cleanup complete but invalid output files in ManifestFileCommitProtocol if job is aborted ## What changes were proposed in this pull request? SPARK-27210 enables ManifestFileCommitProtocol to clean up incomplete output files in task level if task is aborted. This patch extends the area of cleaning up, proposes ManifestFileCommitProtocol to clean up complete but invalid output files in job level if job aborts. Please note that this works as 'best-effort', not kind of guarantee, as we have in HadoopMapReduceCommitProtocol. ## How was this patch tested? Added UT. Closes #24186 from HeartSaVioR/SPARK-27254. Lead-authored-by: Jungtaek Lim (HeartSaVioR) Co-authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Shixiong Zhu --- .../streaming/ManifestFileCommitProtocol.scala | 37 ++- .../spark/sql/streaming/FileStreamSinkSuite.scala | 74 ++ 2 files changed, 109 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala index 916bd2d..f6cc811 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import java.io.IOException import java.util.UUID import scala.collection.mutable.ArrayBuffer @@ -43,6 +44,8 @@ class ManifestFileCommitProtocol(jobId: String, path: String) @transient private var fileLog: FileStreamSinkLog = _ private var batchId: Long = _ + @transient private var pendingCommitFiles: ArrayBuffer[Path] = _ + /** * Sets up the manifest log output and the batch id for this job. * Must be called before any other function. @@ -54,13 +57,21 @@ class ManifestFileCommitProtocol(jobId: String, path: String) override def setupJob(jobContext: JobContext): Unit = { require(fileLog != null, "setupManifestOptions must be called before this function") -// Do nothing +pendingCommitFiles = new ArrayBuffer[Path] } override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { require(fileLog != null, "setupManifestOptions must be called before this function") val fileStatuses = taskCommits.flatMap(_.obj.asInstanceOf[Seq[SinkFileStatus]]).toArray +// We shouldn't remove the files if they're written to the metadata: +// `fileLog.add(batchId, fileStatuses)` could fail AFTER writing files to the metadata +// as well as there could be race +// so for the safety we clean up the list before calling anything incurs exception. +// The case is uncommon and we do best effort instead of guarantee, so the simplicity of +// logic here would be OK, and safe for dealing with unexpected situations. +pendingCommitFiles.clear() + if (fileLog.add(batchId, fileStatuses)) { logInfo(s"Committed batch $batchId") } else { @@ -70,7 +81,29 @@ class ManifestFileCommitProtocol(jobId: String, path: String) override def abortJob(jobContext: JobContext): Unit = { require(fileLog != null, "setupManifestOptions must be called before this function") -// Do nothing +// Best effort cleanup of complete files from failed job. +// Since the file has UUID in its filename, we are safe to try deleting them +// as the file will not conflict with file with another attempt on the same task. +if (pendingCommitFiles.nonEmpty) { + pendingCommitFiles.foreach { path => +try { + val fs = path.getFileSystem(jobContext.getConfiguration) + // this is to make sure the file can be seen from driver as well + if (fs.exists(path)) { +fs.delete(path, false) + } +} catch { + case e: IOException => +logWarning(s"Fail to remove temporary file $path, continue removing next.", e) +} + } + pendingCommitFiles.clear() +} + } + + override def onTaskCommit(taskCommit: TaskCommitMessage): Unit = { +pendingCommitFiles ++= taskCommit.obj.asInstanceOf[Seq[SinkFileStatus]] + .map(_.toFileStatus.getPath) } override def setupTask(tas
[spark] branch master updated (420abb4 -> 233c214)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 420abb4 [SPARK-29263][SCHEDULER] Update `availableSlots` in `resourceOffers()` before checking available slots for barrier taskSet add 233c214 [SPARK-29070][CORE] Make SparkLauncher log full spark-submit command line No new revisions were added by this update. Summary of changes: .../src/main/java/org/apache/spark/launcher/SparkLauncher.java| 8 1 file changed, 8 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-29263][SCHEDULER] Update `availableSlots` in `resourceOffers()` before checking available slots for barrier taskSet
This is an automated email from the ASF dual-hosted git repository. jiangxb1987 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 99e503c [SPARK-29263][SCHEDULER] Update `availableSlots` in `resourceOffers()` before checking available slots for barrier taskSet 99e503c is described below commit 99e503cebfd9cb19372c88b0dd70c6743f864454 Author: Juliusz Sompolski AuthorDate: Fri Sep 27 11:18:32 2019 -0700 [SPARK-29263][SCHEDULER] Update `availableSlots` in `resourceOffers()` before checking available slots for barrier taskSet ### What changes were proposed in this pull request? availableSlots are computed before the for loop looping over all TaskSets in resourceOffers. But the number of slots changes in every iteration, as in every iteration these slots are taken. The number of available slots checked by a barrier task set has therefore to be recomputed in every iteration from availableCpus. ### Why are the changes needed? Bugfix. This could make resourceOffer attempt to start a barrier task set, even though it has not enough slots available. That would then be caught by the `require` in line 519, which will throw an exception, which will get caught and ignored by Dispatcher's MessageLoop, so nothing terrible would happen, but the exception would prevent resourceOffers from considering further TaskSets. Note that launching the barrier TaskSet can still fail if other requirements are not satisfied, and still can be rolled-back by throwing exception in this `require`. Handling it more gracefully remains a TODO in SPARK-24818, but this fix at least should resolve the situation when it's unable to launch because of insufficient slots. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Added UT Closes #23375 Closes #25946 from juliuszsompolski/SPARK-29263. Authored-by: Juliusz Sompolski Signed-off-by: Xingbo Jiang (cherry picked from commit 420abb457df0f422f73bab19a6ed6d7c6bab3173) Signed-off-by: Xingbo Jiang --- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../org/apache/spark/scheduler/FakeTask.scala | 36 +++ .../spark/scheduler/TaskSchedulerImplSuite.scala | 51 -- 3 files changed, 65 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index e194b79..38dbbe7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -391,7 +391,6 @@ private[spark] class TaskSchedulerImpl( // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) val availableCpus = shuffledOffers.map(o => o.cores).toArray -val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( @@ -405,6 +404,7 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { + val availableSlots = availableCpus.map(c => c / CPUS_PER_TASK).sum // Skip the barrier taskSet if the available slots are less than the number of pending tasks. if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { // Skip the launch process. diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala index b29d32f..abc8841 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala @@ -42,15 +42,23 @@ object FakeTask { * locations for each task (given as varargs) if this sequence is not empty. */ def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = { -createTaskSet(numTasks, stageAttemptId = 0, prefLocs: _*) +createTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 0, prefLocs: _*) } - def createTaskSet(numTasks: Int, stageAttemptId: Int, prefLocs: Seq[TaskLocation]*): TaskSet = { -createTaskSet(numTasks, stageId = 0, stageAttemptId, prefLocs: _*) + def createTaskSet( + numTasks: Int, + stageId: Int, + stageAttemptId: Int, + prefLocs: Seq[TaskLocation]*): TaskSet = { +createTaskSet(numTasks, stageId, stageAttemptId, prior
[spark] branch master updated (fda0e6e -> 420abb4)
This is an automated email from the ASF dual-hosted git repository. jiangxb1987 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from fda0e6e [SPARK-29240][PYTHON] Pass Py4J column instance to support PySpark column in element_at function add 420abb4 [SPARK-29263][SCHEDULER] Update `availableSlots` in `resourceOffers()` before checking available slots for barrier taskSet No new revisions were added by this update. Summary of changes: .../apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../org/apache/spark/scheduler/FakeTask.scala | 36 +++ .../spark/scheduler/TaskSchedulerImplSuite.scala | 51 -- 3 files changed, 65 insertions(+), 24 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4bffcf5 -> fda0e6e)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4bffcf5 [SPARK-29275][SQL][DOC] Describe special date/timestamp values in the SQL migration guide add fda0e6e [SPARK-29240][PYTHON] Pass Py4J column instance to support PySpark column in element_at function No new revisions were added by this update. Summary of changes: python/pyspark/sql/functions.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-29240][PYTHON] Pass Py4J column instance to support PySpark column in element_at function
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 361b605 [SPARK-29240][PYTHON] Pass Py4J column instance to support PySpark column in element_at function 361b605 is described below commit 361b605eeb614e14977f81682d54ba94327280d3 Author: HyukjinKwon AuthorDate: Fri Sep 27 11:04:55 2019 -0700 [SPARK-29240][PYTHON] Pass Py4J column instance to support PySpark column in element_at function ### What changes were proposed in this pull request? This PR makes `element_at` in PySpark able to take PySpark `Column` instances. ### Why are the changes needed? To match with Scala side. Seems it was intended but not working correctly as a bug. ### Does this PR introduce any user-facing change? Yes. See below: ```python from pyspark.sql import functions as F x = spark.createDataFrame([([1,2,3],1),([4,5,6],2),([7,8,9],3)],['list','num']) x.withColumn('aa',F.element_at('list',x.num.cast('int'))).show() ``` Before: ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/sql/functions.py", line 2059, in element_at return Column(sc._jvm.functions.element_at(_to_java_column(col), extraction)) File "/.../spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1277, in __call__ File "/.../spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1241, in _build_args File "/.../spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1228, in _get_args File "/.../forked/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_collections.py", line 500, in convert File "/.../spark/python/pyspark/sql/column.py", line 344, in __iter__ raise TypeError("Column is not iterable") TypeError: Column is not iterable ``` After: ``` +-+---+---+ | list|num| aa| +-+---+---+ |[1, 2, 3]| 1| 1| |[4, 5, 6]| 2| 5| |[7, 8, 9]| 3| 9| +-+---+---+ ``` ### How was this patch tested? Manually tested against literal, Python native types, and PySpark column. Closes #25950 from HyukjinKwon/SPARK-29240. Authored-by: HyukjinKwon Signed-off-by: Dongjoon Hyun (cherry picked from commit fda0e6e48d00a1ba8e9d41d7670b3ad3c6951492) Signed-off-by: Dongjoon Hyun --- python/pyspark/sql/functions.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 3833746..069354e 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1990,11 +1990,12 @@ def element_at(col, extraction): [Row(element_at(data, 1)=u'a'), Row(element_at(data, 1)=None)] >>> df = spark.createDataFrame([({"a": 1.0, "b": 2.0},), ({},)], ['data']) ->>> df.select(element_at(df.data, "a")).collect() +>>> df.select(element_at(df.data, lit("a"))).collect() [Row(element_at(data, a)=1.0), Row(element_at(data, a)=None)] """ sc = SparkContext._active_spark_context -return Column(sc._jvm.functions.element_at(_to_java_column(col), extraction)) +return Column(sc._jvm.functions.element_at( +_to_java_column(col), lit(extraction)._jc)) # noqa: F821 'lit' is dynamically defined. @since(2.4) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (cc852d4 -> 4bffcf5)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from cc852d4 [SPARK-29015][SQL][TEST-HADOOP3.2] Reset class loader after initializing SessionState for built-in Hive 2.3 add 4bffcf5 [SPARK-29275][SQL][DOC] Describe special date/timestamp values in the SQL migration guide No new revisions were added by this update. Summary of changes: docs/sql-migration-guide.md | 14 ++ 1 file changed, 14 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4dd0066 -> cc852d4)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4dd0066 [SPARK-21914][SQL][TESTS] Check results of expression examples add cc852d4 [SPARK-29015][SQL][TEST-HADOOP3.2] Reset class loader after initializing SessionState for built-in Hive 2.3 No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 9 - 1 file changed, 8 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (bd28e8e -> 4dd0066)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from bd28e8e [SPARK-29213][SQL] Generate extra IsNotNull predicate in FilterExec add 4dd0066 [SPARK-21914][SQL][TESTS] Check results of expression examples No new revisions were added by this update. Summary of changes: .../aggregate/ApproximatePercentile.scala | 2 +- .../expressions/aggregate/CentralMomentAgg.scala | 6 +-- .../sql/catalyst/expressions/arithmetic.scala | 2 +- .../catalyst/expressions/complexTypeCreator.scala | 4 +- .../sql/catalyst/expressions/csvExpressions.scala | 6 +-- .../catalyst/expressions/datetimeExpressions.scala | 10 ++-- .../sql/catalyst/expressions/generators.scala | 24 - .../spark/sql/catalyst/expressions/grouping.scala | 60 +++--- .../expressions/higherOrderFunctions.scala | 2 +- .../sql/catalyst/expressions/jsonExpressions.scala | 10 ++-- .../sql/catalyst/expressions/mathExpressions.scala | 4 +- .../catalyst/expressions/regexpExpressions.scala | 15 +++--- .../catalyst/expressions/stringExpressions.scala | 14 ++--- .../spark/sql/catalyst/expressions/xml/xpath.scala | 2 +- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 52 ++- 15 files changed, 133 insertions(+), 80 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-29213][SQL] Generate extra IsNotNull predicate in FilterExec
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 3dbe065 [SPARK-29213][SQL] Generate extra IsNotNull predicate in FilterExec 3dbe065 is described below commit 3dbe06561c3d645182b1b512ae8d545056b7613b Author: Wang Shuo AuthorDate: Fri Sep 27 15:14:17 2019 +0800 [SPARK-29213][SQL] Generate extra IsNotNull predicate in FilterExec Currently the behavior of getting output and generating null checks in `FilterExec` is different. Thus some nullable attribute could be treated as not nullable by mistake. In `FilterExec.ouput`, an attribute is marked as nullable or not by finding its `exprId` in notNullAttributes: ``` a.nullable && notNullAttributes.contains(a.exprId) ``` But in `FilterExec.doConsume`, a `nullCheck` is generated or not for a predicate is decided by whether there is semantic equal not null predicate: ``` val nullChecks = c.references.map { r => val idx = notNullPreds.indexWhere { n => n.asInstanceOf[IsNotNull].child.semanticEquals(r)} if (idx != -1 && !generatedIsNotNullChecks(idx)) { generatedIsNotNullChecks(idx) = true // Use the child's output. The nullability is what the child produced. genPredicate(notNullPreds(idx), input, child.output) } else { "" } }.mkString("\n").trim ``` NPE will happen when run the SQL below: ``` sql("create table table1(x string)") sql("create table table2(x bigint)") sql("create table table3(x string)") sql("insert into table2 select null as x") sql( """ |select t1.x |from ( |select x from table1) t1 |left join ( |select x from ( |select x from table2 |union all |select substr(x,5) x from table3 |) a |where length(x)>0 |) t3 |on t1.x=t3.x """.stripMargin).collect() ``` NPE Exception: ``` java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:40) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:135) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:94) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:449) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:452) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` the generated code: ``` == Subtree 4 / 5 == *(2) Project [cast(x#7L as string) AS x#9] +- *(2) Filter ((length(cast(x#7L as string)) > 0) AND isnotnull(cast(x#7L as string))) +- Scan hive default.table2 [x#7L], HiveTableRelation `default`.`table2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [x#7L] Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage2(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=2 /* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private scala.collection.Iterator inputadapter_input_0; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] filter_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2]; /* 011 */ /* 012 */ public GeneratedIteratorForCodegenStage2(Object[] references) { /* 013 */ this.references = references; /* 014 */ } /* 015 */ /* 016 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 017 */ partitionIndex = index; /* 018 *
[spark] branch master updated (aed7ff3 -> bd28e8e)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from aed7ff3 [SPARK-29258][ML][PYSPARK] parity between ml.evaluator and mllib.metrics add bd28e8e [SPARK-29213][SQL] Generate extra IsNotNull predicate in FilterExec No new revisions were added by this update. Summary of changes: .../sql/execution/basicPhysicalOperators.scala | 5 + .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 26 ++ 2 files changed, 31 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org