[spark] branch master updated (731a60c -> c397b06)

2019-06-25 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 731a60c  [SPARK-27823][CORE][DOC][FOLLOWUP] Update doc of config 
`spark.driver.resourcesFile`
 add c397b06  [SPARK-28045][ML][PYTHON] add missing RankingEvaluator

No new revisions were added by this update.

Summary of changes:
 .../spark/ml/evaluation/RankingEvaluator.scala | 118 +
 ...atorSuite.scala => RankingEvaluatorSuite.scala} |  46 
 python/pyspark/ml/evaluation.py|  95 -
 3 files changed, 235 insertions(+), 24 deletions(-)
 create mode 100644 
mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingEvaluator.scala
 copy 
mllib/src/test/scala/org/apache/spark/ml/evaluation/{MultilabelClassificationEvaluatorSuite.scala
 => RankingEvaluatorSuite.scala} (61%)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (c397b06 -> 83b96f6)

2019-06-25 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from c397b06  [SPARK-28045][ML][PYTHON] add missing RankingEvaluator
 add 83b96f6  [SPARK-28117][ML] LDA and BisectingKMeans cache the input 
dataset if necessary

No new revisions were added by this update.

Summary of changes:
 .../spark/ml/clustering/BisectingKMeans.scala  | 10 +
 .../scala/org/apache/spark/ml/clustering/LDA.scala | 25 +-
 .../spark/mllib/clustering/LDAOptimizer.scala  |  6 +-
 .../spark/ml/clustering/BisectingKMeansSuite.scala |  2 +-
 4 files changed, 36 insertions(+), 7 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (83b96f6 -> c83b3dd)

2019-06-25 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 83b96f6  [SPARK-28117][ML] LDA and BisectingKMeans cache the input 
dataset if necessary
 add c83b3dd  [SPARK-28154][ML][FOLLOWUP] GMM fix double caching

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/ml/clustering/GaussianMixture.scala | 12 ++--
 1 file changed, 10 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-28154][ML][FOLLOWUP] GMM fix double caching

2019-06-25 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new eb97f95  [SPARK-28154][ML][FOLLOWUP] GMM fix double caching
eb97f95 is described below

commit eb97f952f6c54af4f192c7bf85a09612000ff793
Author: zhengruifeng 
AuthorDate: Tue Jun 25 06:50:34 2019 -0500

[SPARK-28154][ML][FOLLOWUP] GMM fix double caching

if the input dataset is alreadly cached, then we do not need to cache the 
internal rdd (like kmeans)

existing test

Closes #24919 from zhengruifeng/gmm_fix_double_caching.

Authored-by: zhengruifeng 
Signed-off-by: Sean Owen 
(cherry picked from commit c83b3ddb56d4a32158676b042a7eae861689e141)
Signed-off-by: Sean Owen 
---
 .../org/apache/spark/ml/clustering/GaussianMixture.scala | 12 ++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
index 88abc16..6c0b49c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
@@ -36,6 +36,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 import org.apache.spark.sql.functions.udf
 import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.storage.StorageLevel
 
 
 /**
@@ -342,10 +343,15 @@ class GaussianMixture @Since("2.0.0") (
 val sc = dataset.sparkSession.sparkContext
 val numClusters = $(k)
 
+val handlePersistence = dataset.storageLevel == StorageLevel.NONE
 val instances = dataset
   .select(DatasetUtils.columnToVector(dataset, getFeaturesCol)).rdd.map {
   case Row(features: Vector) => features
-}.cache()
+}
+
+if (handlePersistence) {
+  instances.persist(StorageLevel.MEMORY_AND_DISK)
+}
 
 // Extract the number of features.
 val numFeatures = instances.first().size
@@ -422,8 +428,10 @@ class GaussianMixture @Since("2.0.0") (
   logLikelihood = sums.logLikelihood  // this is the freshly computed 
log-likelihood
   iter += 1
 }
+if (handlePersistence) {
+  instances.unpersist()
+}
 
-instances.unpersist(false)
 val gaussianDists = gaussians.map { case (mean, covVec) =>
   val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, 
covVec.values)
   new MultivariateGaussian(mean, cov)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-26985][CORE] Fix "access only some column of the all of columns " for big endian architecture BUG

2019-06-25 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1a3858a  [SPARK-26985][CORE] Fix "access only some column of the all 
of columns " for big endian architecture BUG
1a3858a is described below

commit 1a3858a7694a189f434846b132b828e902273620
Author: ketank-new 
AuthorDate: Tue Jun 25 08:24:10 2019 -0500

[SPARK-26985][CORE] Fix "access only some column of the all of columns " 
for big endian architecture BUG

continuation to https://github.com/apache/spark/pull/24788

## What changes were proposed in this pull request?

Changes are related to BIG ENDIAN system
This changes are done to

identify s390x platform.
use byteorder to BIG_ENDIAN for big endian systems
changes for 2 are done in access functions putFloats() and putDouble()

## How was this patch tested?

Changes have been tested to build successfully on s390x as well x86 
platform to make sure build is successful.

Closes #24861 from ketank-new/ketan_latest_v2.3.2.

Authored-by: ketank-new 
Signed-off-by: Sean Owen 
---
 common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java | 2 +-
 .../apache/spark/sql/execution/vectorized/OffHeapColumnVector.java| 4 ++--
 .../org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java | 4 ++--
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index d98f08f..dc8d6e3 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -304,7 +304,7 @@ public final class Platform {
   static {
 boolean _unaligned;
 String arch = System.getProperty("os.arch", "");
-if (arch.equals("ppc64le") || arch.equals("ppc64")) {
+if (arch.equals("ppc64le") || arch.equals("ppc64") || 
arch.equals("s390x")) {
   // Since java.nio.Bits.unaligned() doesn't return true on ppc (See 
JDK-8165231), but
   // ppc64 and ppc64le support it
   _unaligned = true;
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 5e0cf7d..3b919c7 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -417,7 +417,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
   null, data + rowId * 4L, count * 4L);
 } else {
-  ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
+  ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.BIG_ENDIAN);
   long offset = data + 4L * rowId;
   for (int i = 0; i < count; ++i, offset += 4) {
 Platform.putFloat(null, offset, bb.getFloat(srcIndex + (4 * i)));
@@ -472,7 +472,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
 null, data + rowId * 8L, count * 8L);
 } else {
-  ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
+  ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.BIG_ENDIAN);
   long offset = data + 8L * rowId;
   for (int i = 0; i < count; ++i, offset += 8) {
 Platform.putDouble(null, offset, bb.getDouble(srcIndex + (8 * i)));
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 577eab6..1bf3126 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -396,7 +396,7 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
   Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, 
floatData,
   Platform.DOUBLE_ARRAY_OFFSET + rowId * 4L, count * 4L);
 } else {
-  ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
+  ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.BIG_ENDIAN);
   for (int i = 0; i < count; ++i) {
 floatData[i + rowId] = bb.getFloat(srcIndex + (4 * i));
   }
@@ -445,7 +445,7 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
   Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, 
doubleData,
   Platform.DOUBLE_ARRAY_OFFSET + rowId * 8L,

[spark] branch master updated (1a3858a -> b71c130)

2019-06-25 Thread vanzin
This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 1a3858a  [SPARK-26985][CORE] Fix "access only some column of the all 
of columns " for big endian architecture BUG
 add b71c130  [SPARK-27622][CORE] Avoiding the network when block manager 
fetches disk persisted RDD blocks from the same host

No new revisions were added by this update.

Summary of changes:
 .../spark/network/shuffle/ExecutorDiskUtils.java   |  66 +++
 .../shuffle/ExternalShuffleBlockResolver.java  |  45 +---
 .../shuffle/ExternalShuffleBlockResolverSuite.java |   2 +-
 .../network/shuffle/TestShuffleDataContext.java|   9 +-
 .../org/apache/spark/storage/BlockManager.scala| 121 
 .../apache/spark/storage/BlockManagerMaster.scala  |  13 ++-
 .../spark/storage/BlockManagerMasterEndpoint.scala |  32 --
 .../spark/storage/BlockManagerMessages.scala   |  17 ++-
 .../apache/spark/storage/DiskBlockManager.scala|   5 +-
 .../spark/storage/BlockManagerInfoSuite.scala  |   1 +
 .../apache/spark/storage/BlockManagerSuite.scala   | 124 +++--
 11 files changed, 342 insertions(+), 93 deletions(-)
 create mode 100644 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskUtils.java


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-27630][CORE] Properly handle task end events from completed stages

2019-06-25 Thread irashid
This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 38263f6  [SPARK-27630][CORE] Properly handle task end events from 
completed stages
38263f6 is described below

commit 38263f6d153944b6f2f0248d9284861fc82532d6
Author: sychen 
AuthorDate: Tue Jun 25 14:30:13 2019 -0500

[SPARK-27630][CORE] Properly handle task end events from completed stages

## What changes were proposed in this pull request?
Track tasks separately for each stage attempt (instead of tracking by 
stage), and do NOT reset the numRunningTasks to 0 on StageCompleted.

In the case of stage retry, the `taskEnd` event from the zombie stage 
sometimes makes the number of `totalRunningTasks` negative, which will causes 
the job to get stuck.
Similar problem also exists with `stageIdToTaskIndices` & 
`stageIdToSpeculativeTaskIndices`.
If it is a failed `taskEnd` event of the zombie stage, this will cause 
`stageIdToTaskIndices` or `stageIdToSpeculativeTaskIndices` to remove the task 
index of the active stage, and the number of `totalPendingTasks` will increase 
unexpectedly.
## How was this patch tested?
unit test properly handle task end events from completed stages

Closes #24497 from cxzl25/fix_stuck_job_follow_up.

Authored-by: sychen 
Signed-off-by: Imran Rashid 
---
 .../apache/spark/ExecutorAllocationManager.scala   | 113 -
 .../org/apache/spark/scheduler/DAGScheduler.scala  |   2 +-
 .../org/apache/spark/scheduler/SparkListener.scala |   5 +-
 .../spark/ExecutorAllocationManagerSuite.scala |  33 +-
 project/MimaExcludes.scala |   6 ++
 5 files changed, 104 insertions(+), 55 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index bb95fea..bceb26c 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -491,6 +491,10 @@ private[spark] class ExecutorAllocationManager(
 numExecutorsToAdd = 1
   }
 
+  private case class StageAttempt(stageId: Int, stageAttemptId: Int) {
+override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)"
+  }
+
   /**
* A listener that notifies the given allocation manager of when to add and 
remove executors.
*
@@ -499,29 +503,32 @@ private[spark] class ExecutorAllocationManager(
*/
   private[spark] class ExecutorAllocationListener extends SparkListener {
 
-private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
-// Number of running tasks per stage including speculative tasks.
+private val stageAttemptToNumTasks = new mutable.HashMap[StageAttempt, Int]
+// Number of running tasks per stageAttempt including speculative tasks.
 // Should be 0 when no stages are active.
-private val stageIdToNumRunningTask = new mutable.HashMap[Int, Int]
-private val stageIdToTaskIndices = new mutable.HashMap[Int, 
mutable.HashSet[Int]]
-// Number of speculative tasks to be scheduled in each stage
-private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int]
-// The speculative tasks started in each stage
-private val stageIdToSpeculativeTaskIndices = new mutable.HashMap[Int, 
mutable.HashSet[Int]]
-
-// stageId to tuple (the number of task with locality preferences, a map 
where each pair is a
-// node and the number of tasks that would like to be scheduled on that 
node) map,
-// maintain the executor placement hints for each stage Id used by 
resource framework to better
-// place the executors.
-private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
+private val stageAttemptToNumRunningTask = new 
mutable.HashMap[StageAttempt, Int]
+private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, 
mutable.HashSet[Int]]
+// Number of speculative tasks to be scheduled in each stageAttempt
+private val stageAttemptToNumSpeculativeTasks = new 
mutable.HashMap[StageAttempt, Int]
+// The speculative tasks started in each stageAttempt
+private val stageAttemptToSpeculativeTaskIndices =
+  new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
+
+// stageAttempt to tuple (the number of task with locality preferences, a 
map where each pair
+// is a node and the number of tasks that would like to be scheduled on 
that node) map,
+// maintain the executor placement hints for each stageAttempt used by 
resource framework
+// to better place the executors.
+private val stageAttemptToExecutorPlacementHints =
+  new mutable.HashMap[StageAttempt, (Int, Map[String, Int])]
 
 override def onStageSubm

[spark] branch master updated: [SPARK-27676][SQL][SS] InMemoryFileIndex should respect spark.sql.files.ignoreMissingFiles

2019-06-25 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d83f84a  [SPARK-27676][SQL][SS] InMemoryFileIndex should respect 
spark.sql.files.ignoreMissingFiles
d83f84a is described below

commit d83f84a1229138b7935b4b18da80a96d3c5c3dde
Author: Josh Rosen 
AuthorDate: Wed Jun 26 09:11:28 2019 +0900

[SPARK-27676][SQL][SS] InMemoryFileIndex should respect 
spark.sql.files.ignoreMissingFiles

## What changes were proposed in this pull request?

Spark's `InMemoryFileIndex` contains two places where `FileNotFound` 
exceptions are caught and logged as warnings (during [directory 
listing](https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L274)
 and [block location 
lookup](https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources
 [...]

I think that this is a dangerous default behavior because it can mask bugs 
caused by race conditions (e.g. overwriting a table while it's being read) or 
S3 consistency issues (there's more discussion on this in the [JIRA 
ticket](https://issues.apache.org/jira/browse/SPARK-27676)). Failing fast when 
we detect missing files is not sufficient to make concurrent table reads/writes 
or S3 listing safe (there are other classes of eventual consistency issues to 
worry about), but I think it's  [...]

There may be some cases where users _do_ want to ignore missing files, but 
I think that should be an opt-in behavior via the existing 
`spark.sql.files.ignoreMissingFiles` flag (the current behavior is itself 
race-prone because a file might be be deleted between catalog listing and query 
execution time, triggering FileNotFoundExceptions on executors (which are 
handled in a way that _does_ respect `ignoreMissingFIles`)).

This PR updates `InMemoryFileIndex` to guard the 
log-and-ignore-FileNotFoundException behind the existing 
`spark.sql.files.ignoreMissingFiles` flag.

**Note**: this is a change of default behavior, so I think it needs to be 
mentioned in release notes.

## How was this patch tested?

New unit tests to simulate file-deletion race conditions, tested with both 
values of the `ignoreMissingFIles` flag.

Closes #24668 from JoshRosen/SPARK-27676.

Lead-authored-by: Josh Rosen 
Co-authored-by: Josh Rosen 
Signed-off-by: HyukjinKwon 
---
 docs/sql-migration-guide-upgrade.md|   2 +
 .../spark/sql/execution/command/CommandUtils.scala |   2 +-
 .../execution/datasources/InMemoryFileIndex.scala  |  71 ++--
 .../sql/execution/datasources/FileIndexSuite.scala | 126 -
 4 files changed, 188 insertions(+), 13 deletions(-)

diff --git a/docs/sql-migration-guide-upgrade.md 
b/docs/sql-migration-guide-upgrade.md
index b062a04..c920f06 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -145,6 +145,8 @@ license: |
 
   - Since Spark 3.0, a higher-order function `exists` follows the three-valued 
boolean logic, i.e., if the `predicate` returns any `null`s and no `true` is 
obtained, then `exists` will return `null` instead of `false`. For example, 
`exists(array(1, null, 3), x -> x % 2 == 0)` will be `null`. The previous 
behaviour can be restored by setting 
`spark.sql.legacy.arrayExistsFollowsThreeValuedLogic` to `false`.
 
+  - Since Spark 3.0, if files or subdirectories disappear during recursive 
directory listing (i.e. they appear in an intermediate listing but then cannot 
be read or listed during later phases of the recursive directory listing, due 
to either concurrent file deletions or object store consistency issues) then 
the listing will fail with an exception unless 
`spark.sql.files.ignoreMissingFiles` is `true` (default `false`). In previous 
versions, these missing files or subdirectories would be i [...]
+
 ## Upgrading from Spark SQL 2.4 to 2.4.1
 
   - The value of `spark.executor.heartbeatInterval`, when specified without 
units like "30" rather than "30s", was
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index cac2519..b644e6d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -67,7 +67,7 @@ object CommandUtils extends Logging {
   override def accept(path: Path): Boolean = isDataPath(path, 
stagingDir)
 }
 val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(
-  paths, sessionState.newHadoopC