[spark] branch branch-2.4 updated: [SPARK-29263][CORE][TEST][FOLLOWUP][2.4] Fix build failure of `TaskSchedulerImplSuite`

2019-09-27 Thread vanzin
This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 9ae7393  [SPARK-29263][CORE][TEST][FOLLOWUP][2.4] Fix build failure of 
`TaskSchedulerImplSuite`
9ae7393 is described below

commit 9ae73932bb749bde6b71cbe6cf595ec2d23b60ea
Author: Xingbo Jiang 
AuthorDate: Fri Sep 27 16:31:23 2019 -0700

[SPARK-29263][CORE][TEST][FOLLOWUP][2.4] Fix build failure of 
`TaskSchedulerImplSuite`

### What changes were proposed in this pull request?

https://github.com/apache/spark/pull/25946 Fixed a bug and modified the 
`TaskSchedulerImplSuite`, when backported to 2.4 it breaks the build. This PR 
is to fix the broken test build.

### How was this patch tested?

Passed locally.

Closes #25952 from jiangxb1987/SPARK-29263.

Authored-by: Xingbo Jiang 
Signed-off-by: Marcelo Vanzin 
---
 .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala| 10 +++---
 .../scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala |  2 +-
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 5c0601eb03..ecbb6ab 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -77,7 +77,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
   }
 
   def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = {
-val conf = new 
SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
+setupSchedulerWithMaster("local", confs: _*)
+  }
+
+  def setupSchedulerWithMaster(master: String, confs: (String, String)*): 
TaskSchedulerImpl = {
+val conf = new 
SparkConf().setMaster(master).setAppName("TaskSchedulerImplSuite")
 confs.foreach { case (k, v) => conf.set(k, v) }
 sc = new SparkContext(conf)
 taskScheduler = new TaskSchedulerImpl(sc)
@@ -1129,7 +1133,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 // you'd need the previous stage to also get restarted, and then succeed, 
in between each
 // attempt, but that happens outside what we're mocking here.)
 val zombieAttempts = (0 until 2).map { stageAttempt =>
-  val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
+  val attempt = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = 
stageAttempt)
   taskScheduler.submitTasks(attempt)
   val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
   val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
@@ -1148,7 +1152,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 // we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
 // the stage, but this time with insufficient resources so not all tasks 
are active.
 
-val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+val finalAttempt = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId 
= 2)
 taskScheduler.submitTasks(finalAttempt)
 val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
 val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index d264ada..93a4b1f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -1398,7 +1398,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(taskSetManager1.isZombie)
 assert(taskSetManager1.runningTasks === 9)
 
-val taskSet2 = FakeTask.createTaskSet(10, stageAttemptId = 1)
+val taskSet2 = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = 1)
 sched.submitTasks(taskSet2)
 sched.resourceOffers(
   (11 until 20).map { idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) })


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-27254][SS] Cleanup complete but invalid output files in ManifestFileCommitProtocol if job is aborted

2019-09-27 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d72f398  [SPARK-27254][SS] Cleanup complete but invalid output files 
in ManifestFileCommitProtocol if job is aborted
d72f398 is described below

commit d72f39897b00d0bbd7a4db9de281a1256fcf908d
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Fri Sep 27 12:35:26 2019 -0700

[SPARK-27254][SS] Cleanup complete but invalid output files in 
ManifestFileCommitProtocol if job is aborted

## What changes were proposed in this pull request?

SPARK-27210 enables ManifestFileCommitProtocol to clean up incomplete 
output files in task level if task is aborted.

This patch extends the area of cleaning up, proposes 
ManifestFileCommitProtocol to clean up complete but invalid output files in job 
level if job aborts. Please note that this works as 'best-effort', not kind of 
guarantee, as we have in HadoopMapReduceCommitProtocol.

## How was this patch tested?

Added UT.

Closes #24186 from HeartSaVioR/SPARK-27254.

Lead-authored-by: Jungtaek Lim (HeartSaVioR) 
Co-authored-by: Jungtaek Lim (HeartSaVioR) 
Signed-off-by: Shixiong Zhu 
---
 .../streaming/ManifestFileCommitProtocol.scala | 37 ++-
 .../spark/sql/streaming/FileStreamSinkSuite.scala  | 74 ++
 2 files changed, 109 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
index 916bd2d..f6cc811 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.IOException
 import java.util.UUID
 
 import scala.collection.mutable.ArrayBuffer
@@ -43,6 +44,8 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
   @transient private var fileLog: FileStreamSinkLog = _
   private var batchId: Long = _
 
+  @transient private var pendingCommitFiles: ArrayBuffer[Path] = _
+
   /**
* Sets up the manifest log output and the batch id for this job.
* Must be called before any other function.
@@ -54,13 +57,21 @@ class ManifestFileCommitProtocol(jobId: String, path: 
String)
 
   override def setupJob(jobContext: JobContext): Unit = {
 require(fileLog != null, "setupManifestOptions must be called before this 
function")
-// Do nothing
+pendingCommitFiles = new ArrayBuffer[Path]
   }
 
   override def commitJob(jobContext: JobContext, taskCommits: 
Seq[TaskCommitMessage]): Unit = {
 require(fileLog != null, "setupManifestOptions must be called before this 
function")
 val fileStatuses = 
taskCommits.flatMap(_.obj.asInstanceOf[Seq[SinkFileStatus]]).toArray
 
+// We shouldn't remove the files if they're written to the metadata:
+// `fileLog.add(batchId, fileStatuses)` could fail AFTER writing files to 
the metadata
+// as well as there could be race
+// so for the safety we clean up the list before calling anything incurs 
exception.
+// The case is uncommon and we do best effort instead of guarantee, so the 
simplicity of
+// logic here would be OK, and safe for dealing with unexpected situations.
+pendingCommitFiles.clear()
+
 if (fileLog.add(batchId, fileStatuses)) {
   logInfo(s"Committed batch $batchId")
 } else {
@@ -70,7 +81,29 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
 
   override def abortJob(jobContext: JobContext): Unit = {
 require(fileLog != null, "setupManifestOptions must be called before this 
function")
-// Do nothing
+// Best effort cleanup of complete files from failed job.
+// Since the file has UUID in its filename, we are safe to try deleting 
them
+// as the file will not conflict with file with another attempt on the 
same task.
+if (pendingCommitFiles.nonEmpty) {
+  pendingCommitFiles.foreach { path =>
+try {
+  val fs = path.getFileSystem(jobContext.getConfiguration)
+  // this is to make sure the file can be seen from driver as well
+  if (fs.exists(path)) {
+fs.delete(path, false)
+  }
+} catch {
+  case e: IOException =>
+logWarning(s"Fail to remove temporary file $path, continue 
removing next.", e)
+}
+  }
+  pendingCommitFiles.clear()
+}
+  }
+
+  override def onTaskCommit(taskCommit: TaskCommitMessage): Unit = {
+pendingCommitFiles ++= taskCommit.obj.asInstanceOf[Seq[SinkFileStatus]]
+  .map(_.toFileStatus.getPath)
   }
 
   override def setupTask(tas

[spark] branch master updated (420abb4 -> 233c214)

2019-09-27 Thread vanzin
This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 420abb4  [SPARK-29263][SCHEDULER] Update `availableSlots` in 
`resourceOffers()` before checking available slots for barrier taskSet
 add 233c214  [SPARK-29070][CORE] Make SparkLauncher log full spark-submit 
command line

No new revisions were added by this update.

Summary of changes:
 .../src/main/java/org/apache/spark/launcher/SparkLauncher.java| 8 
 1 file changed, 8 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-29263][SCHEDULER] Update `availableSlots` in `resourceOffers()` before checking available slots for barrier taskSet

2019-09-27 Thread jiangxb1987
This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 99e503c  [SPARK-29263][SCHEDULER] Update `availableSlots` in 
`resourceOffers()` before checking available slots for barrier taskSet
99e503c is described below

commit 99e503cebfd9cb19372c88b0dd70c6743f864454
Author: Juliusz Sompolski 
AuthorDate: Fri Sep 27 11:18:32 2019 -0700

[SPARK-29263][SCHEDULER] Update `availableSlots` in `resourceOffers()` 
before checking available slots for barrier taskSet

### What changes were proposed in this pull request?

availableSlots are computed before the for loop looping over all TaskSets 
in resourceOffers. But the number of slots changes in every iteration, as in 
every iteration these slots are taken. The number of available slots checked by 
a barrier task set has therefore to be recomputed in every iteration from 
availableCpus.

### Why are the changes needed?

Bugfix.
This could make resourceOffer attempt to start a barrier task set, even 
though it has not enough slots available. That would then be caught by the 
`require` in line 519, which will throw an exception, which will get caught and 
ignored by Dispatcher's MessageLoop, so nothing terrible would happen, but the 
exception would prevent resourceOffers from considering further TaskSets.
Note that launching the barrier TaskSet can still fail if other 
requirements are not satisfied, and still can be rolled-back by throwing 
exception in this `require`. Handling it more gracefully remains a TODO in 
SPARK-24818, but this fix at least should resolve the situation when it's 
unable to launch because of insufficient slots.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Added UT

Closes #23375

Closes #25946 from juliuszsompolski/SPARK-29263.

Authored-by: Juliusz Sompolski 
Signed-off-by: Xingbo Jiang 
(cherry picked from commit 420abb457df0f422f73bab19a6ed6d7c6bab3173)
Signed-off-by: Xingbo Jiang 
---
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |  2 +-
 .../org/apache/spark/scheduler/FakeTask.scala  | 36 +++
 .../spark/scheduler/TaskSchedulerImplSuite.scala   | 51 --
 3 files changed, 65 insertions(+), 24 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index e194b79..38dbbe7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -391,7 +391,6 @@ private[spark] class TaskSchedulerImpl(
 // Build a list of tasks to assign to each worker.
 val tasks = shuffledOffers.map(o => new 
ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
 val availableCpus = shuffledOffers.map(o => o.cores).toArray
-val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum
 val sortedTaskSets = rootPool.getSortedTaskSetQueue
 for (taskSet <- sortedTaskSets) {
   logDebug("parentName: %s, name: %s, runningTasks: %s".format(
@@ -405,6 +404,7 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks on 
all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, 
RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
+  val availableSlots = availableCpus.map(c => c / CPUS_PER_TASK).sum
   // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
   if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
 // Skip the launch process.
diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala 
b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index b29d32f..abc8841 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -42,15 +42,23 @@ object FakeTask {
* locations for each task (given as varargs) if this sequence is not empty.
*/
   def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
-createTaskSet(numTasks, stageAttemptId = 0, prefLocs: _*)
+createTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 0, 
prefLocs: _*)
   }
 
-  def createTaskSet(numTasks: Int, stageAttemptId: Int, prefLocs: 
Seq[TaskLocation]*): TaskSet = {
-createTaskSet(numTasks, stageId = 0, stageAttemptId, prefLocs: _*)
+  def createTaskSet(
+  numTasks: Int,
+  stageId: Int,
+  stageAttemptId: Int,
+  prefLocs: Seq[TaskLocation]*): TaskSet = {
+createTaskSet(numTasks, stageId, stageAttemptId, prior

[spark] branch master updated (fda0e6e -> 420abb4)

2019-09-27 Thread jiangxb1987
This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from fda0e6e  [SPARK-29240][PYTHON] Pass Py4J column instance to support 
PySpark column in element_at function
 add 420abb4  [SPARK-29263][SCHEDULER] Update `availableSlots` in 
`resourceOffers()` before checking available slots for barrier taskSet

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |  2 +-
 .../org/apache/spark/scheduler/FakeTask.scala  | 36 +++
 .../spark/scheduler/TaskSchedulerImplSuite.scala   | 51 --
 3 files changed, 65 insertions(+), 24 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (4bffcf5 -> fda0e6e)

2019-09-27 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4bffcf5  [SPARK-29275][SQL][DOC] Describe special date/timestamp 
values in the SQL migration guide
 add fda0e6e  [SPARK-29240][PYTHON] Pass Py4J column instance to support 
PySpark column in element_at function

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/functions.py | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-29240][PYTHON] Pass Py4J column instance to support PySpark column in element_at function

2019-09-27 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 361b605  [SPARK-29240][PYTHON] Pass Py4J column instance to support 
PySpark column in element_at function
361b605 is described below

commit 361b605eeb614e14977f81682d54ba94327280d3
Author: HyukjinKwon 
AuthorDate: Fri Sep 27 11:04:55 2019 -0700

[SPARK-29240][PYTHON] Pass Py4J column instance to support PySpark column 
in element_at function

### What changes were proposed in this pull request?

This PR makes `element_at` in PySpark able to take PySpark `Column` 
instances.

### Why are the changes needed?

To match with Scala side. Seems it was intended but not working correctly 
as a bug.

### Does this PR introduce any user-facing change?

Yes. See below:

```python
from pyspark.sql import functions as F
x = 
spark.createDataFrame([([1,2,3],1),([4,5,6],2),([7,8,9],3)],['list','num'])
x.withColumn('aa',F.element_at('list',x.num.cast('int'))).show()
```

Before:

```
Traceback (most recent call last):
  File "", line 1, in 
  File "/.../spark/python/pyspark/sql/functions.py", line 2059, in 
element_at
return Column(sc._jvm.functions.element_at(_to_java_column(col), 
extraction))
  File "/.../spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", 
line 1277, in __call__
  File "/.../spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", 
line 1241, in _build_args
  File "/.../spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", 
line 1228, in _get_args
  File 
"/.../forked/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_collections.py", 
line 500, in convert
  File "/.../spark/python/pyspark/sql/column.py", line 344, in __iter__
raise TypeError("Column is not iterable")
TypeError: Column is not iterable
```

After:

```
+-+---+---+
| list|num| aa|
+-+---+---+
|[1, 2, 3]|  1|  1|
|[4, 5, 6]|  2|  5|
|[7, 8, 9]|  3|  9|
+-+---+---+
```

### How was this patch tested?

Manually tested against literal, Python native types, and PySpark column.

Closes #25950 from HyukjinKwon/SPARK-29240.

Authored-by: HyukjinKwon 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit fda0e6e48d00a1ba8e9d41d7670b3ad3c6951492)
Signed-off-by: Dongjoon Hyun 
---
 python/pyspark/sql/functions.py | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 3833746..069354e 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1990,11 +1990,12 @@ def element_at(col, extraction):
 [Row(element_at(data, 1)=u'a'), Row(element_at(data, 1)=None)]
 
 >>> df = spark.createDataFrame([({"a": 1.0, "b": 2.0},), ({},)], ['data'])
->>> df.select(element_at(df.data, "a")).collect()
+>>> df.select(element_at(df.data, lit("a"))).collect()
 [Row(element_at(data, a)=1.0), Row(element_at(data, a)=None)]
 """
 sc = SparkContext._active_spark_context
-return Column(sc._jvm.functions.element_at(_to_java_column(col), 
extraction))
+return Column(sc._jvm.functions.element_at(
+_to_java_column(col), lit(extraction)._jc))  # noqa: F821 'lit' is 
dynamically defined.
 
 
 @since(2.4)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (cc852d4 -> 4bffcf5)

2019-09-27 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from cc852d4  [SPARK-29015][SQL][TEST-HADOOP3.2] Reset class loader after 
initializing SessionState for built-in Hive 2.3
 add 4bffcf5  [SPARK-29275][SQL][DOC] Describe special date/timestamp 
values in the SQL migration guide

No new revisions were added by this update.

Summary of changes:
 docs/sql-migration-guide.md | 14 ++
 1 file changed, 14 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (4dd0066 -> cc852d4)

2019-09-27 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4dd0066  [SPARK-21914][SQL][TESTS] Check results of expression examples
 add cc852d4  [SPARK-29015][SQL][TEST-HADOOP3.2] Reset class loader after 
initializing SessionState for built-in Hive 2.3

No new revisions were added by this update.

Summary of changes:
 .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala  | 9 -
 1 file changed, 8 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (bd28e8e -> 4dd0066)

2019-09-27 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from bd28e8e  [SPARK-29213][SQL] Generate extra IsNotNull predicate in 
FilterExec
 add 4dd0066  [SPARK-21914][SQL][TESTS] Check results of expression examples

No new revisions were added by this update.

Summary of changes:
 .../aggregate/ApproximatePercentile.scala  |  2 +-
 .../expressions/aggregate/CentralMomentAgg.scala   |  6 +--
 .../sql/catalyst/expressions/arithmetic.scala  |  2 +-
 .../catalyst/expressions/complexTypeCreator.scala  |  4 +-
 .../sql/catalyst/expressions/csvExpressions.scala  |  6 +--
 .../catalyst/expressions/datetimeExpressions.scala | 10 ++--
 .../sql/catalyst/expressions/generators.scala  | 24 -
 .../spark/sql/catalyst/expressions/grouping.scala  | 60 +++---
 .../expressions/higherOrderFunctions.scala |  2 +-
 .../sql/catalyst/expressions/jsonExpressions.scala | 10 ++--
 .../sql/catalyst/expressions/mathExpressions.scala |  4 +-
 .../catalyst/expressions/regexpExpressions.scala   | 15 +++---
 .../catalyst/expressions/stringExpressions.scala   | 14 ++---
 .../spark/sql/catalyst/expressions/xml/xpath.scala |  2 +-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 52 ++-
 15 files changed, 133 insertions(+), 80 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-29213][SQL] Generate extra IsNotNull predicate in FilterExec

2019-09-27 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 3dbe065  [SPARK-29213][SQL] Generate extra IsNotNull predicate in 
FilterExec
3dbe065 is described below

commit 3dbe06561c3d645182b1b512ae8d545056b7613b
Author: Wang Shuo 
AuthorDate: Fri Sep 27 15:14:17 2019 +0800

[SPARK-29213][SQL] Generate extra IsNotNull predicate in FilterExec

Currently the behavior of getting output and generating null checks in 
`FilterExec` is different. Thus some nullable attribute could be treated as not 
nullable by mistake.

In `FilterExec.ouput`, an attribute is marked as nullable or not by finding 
its `exprId` in notNullAttributes:
```
a.nullable && notNullAttributes.contains(a.exprId)
```
But in `FilterExec.doConsume`,  a `nullCheck` is generated or not for a 
predicate is decided by whether there is semantic equal not null predicate:
```
  val nullChecks = c.references.map { r =>
val idx = notNullPreds.indexWhere { n => 
n.asInstanceOf[IsNotNull].child.semanticEquals(r)}
if (idx != -1 && !generatedIsNotNullChecks(idx)) {
  generatedIsNotNullChecks(idx) = true
  // Use the child's output. The nullability is what the child 
produced.
  genPredicate(notNullPreds(idx), input, child.output)
} else {
  ""
}
  }.mkString("\n").trim
```
NPE will happen when run the SQL below:
```
sql("create table table1(x string)")
sql("create table table2(x bigint)")
sql("create table table3(x string)")
sql("insert into table2 select null as x")
sql(
  """
|select t1.x
|from (
|select x from table1) t1
|left join (
|select x from (
|select x from table2
|union all
|select substr(x,5) x from table3
|) a
|where length(x)>0
|) t3
|on t1.x=t3.x
  """.stripMargin).collect()
```
NPE Exception:
```
java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:40)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:135)
at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:94)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:449)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:452)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
the generated code:
```
== Subtree 4 / 5 ==
*(2) Project [cast(x#7L as string) AS x#9]
+- *(2) Filter ((length(cast(x#7L as string)) > 0) AND isnotnull(cast(x#7L 
as string)))
   +- Scan hive default.table2 [x#7L], HiveTableRelation 
`default`.`table2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [x#7L]

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage2(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=2
/* 006 */ final class GeneratedIteratorForCodegenStage2 extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private scala.collection.Iterator inputadapter_input_0;
/* 010 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] 
filter_mutableStateArray_0 = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
/* 011 */
/* 012 */   public GeneratedIteratorForCodegenStage2(Object[] references) {
/* 013 */ this.references = references;
/* 014 */   }
/* 015 */
/* 016 */   public void init(int index, scala.collection.Iterator[] inputs) 
{
/* 017 */ partitionIndex = index;
/* 018 *

[spark] branch master updated (aed7ff3 -> bd28e8e)

2019-09-27 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from aed7ff3  [SPARK-29258][ML][PYSPARK] parity between ml.evaluator and 
mllib.metrics
 add bd28e8e  [SPARK-29213][SQL] Generate extra IsNotNull predicate in 
FilterExec

No new revisions were added by this update.

Summary of changes:
 .../sql/execution/basicPhysicalOperators.scala |  5 +
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 26 ++
 2 files changed, 31 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org