svn commit: r24111 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_10_00_01-70bcc9d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-01-10 Thread pwendell
Author: pwendell
Date: Wed Jan 10 08:16:20 2018
New Revision: 24111

Log:
Apache Spark 2.3.0-SNAPSHOT-2018_01_10_00_01-70bcc9d docs


[This commit notification would consist of 1439 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22997] Add additional defenses against use of freed MemoryBlocks

2018-01-10 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 70bcc9d5a -> f340b6b30


[SPARK-22997] Add additional defenses against use of freed MemoryBlocks

## What changes were proposed in this pull request?

This patch modifies Spark's `MemoryAllocator` implementations so that 
`free(MemoryBlock)` mutates the passed block to clear pointers (in the off-heap 
case) or null out references to backing `long[]` arrays (in the on-heap case). 
The goal of this change is to add an extra layer of defense against 
use-after-free bugs because currently it's hard to detect corruption caused by 
blind writes to freed memory blocks.

## How was this patch tested?

New unit tests in `PlatformSuite`, including new tests for existing 
functionality because we did not have sufficient mutation coverage of the 
on-heap memory allocator's pooling logic.

Author: Josh Rosen 

Closes #20191 from 
JoshRosen/SPARK-22997-add-defenses-against-use-after-free-bugs-in-memory-allocator.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f340b6b3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f340b6b3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f340b6b3

Branch: refs/heads/master
Commit: f340b6b3066033d40b7e163fd5fb68e9820adfb1
Parents: 70bcc9d
Author: Josh Rosen 
Authored: Wed Jan 10 00:45:47 2018 -0800
Committer: Josh Rosen 
Committed: Wed Jan 10 00:45:47 2018 -0800

--
 .../unsafe/memory/HeapMemoryAllocator.java  | 35 ++
 .../apache/spark/unsafe/memory/MemoryBlock.java | 21 +++-
 .../unsafe/memory/UnsafeMemoryAllocator.java| 11 +
 .../apache/spark/unsafe/PlatformUtilSuite.java  | 50 +++-
 .../apache/spark/memory/TaskMemoryManager.java  | 13 -
 .../spark/memory/TaskMemoryManagerSuite.java| 29 
 6 files changed, 146 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f340b6b3/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
--
diff --git 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
index cc9cc42..3acfe36 100644
--- 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
+++ 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
@@ -31,8 +31,7 @@ import org.apache.spark.unsafe.Platform;
 public class HeapMemoryAllocator implements MemoryAllocator {
 
   @GuardedBy("this")
-  private final Map>> 
bufferPoolsBySize =
-new HashMap<>();
+  private final Map>> bufferPoolsBySize 
= new HashMap<>();
 
   private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;
 
@@ -49,13 +48,14 @@ public class HeapMemoryAllocator implements MemoryAllocator 
{
   public MemoryBlock allocate(long size) throws OutOfMemoryError {
 if (shouldPool(size)) {
   synchronized (this) {
-final LinkedList> pool = 
bufferPoolsBySize.get(size);
+final LinkedList> pool = 
bufferPoolsBySize.get(size);
 if (pool != null) {
   while (!pool.isEmpty()) {
-final WeakReference blockReference = pool.pop();
-final MemoryBlock memory = blockReference.get();
-if (memory != null) {
-  assert (memory.size() == size);
+final WeakReference arrayReference = pool.pop();
+final long[] array = arrayReference.get();
+if (array != null) {
+  assert (array.length * 8L >= size);
+  MemoryBlock memory = new MemoryBlock(array, 
Platform.LONG_ARRAY_OFFSET, size);
   if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
 memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
   }
@@ -76,18 +76,35 @@ public class HeapMemoryAllocator implements MemoryAllocator 
{
 
   @Override
   public void free(MemoryBlock memory) {
+assert (memory.obj != null) :
+  "baseObject was null; are you trying to use the on-heap allocator to 
free off-heap memory?";
+assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
+  "page has already been freed";
+assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
+|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
+  "TMM-allocated pages must first be freed via TMM.freePage(), not 
directly in allocator free()";
+
 final long size = memory.size();
 if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
   memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
 }
+
+// Mark the page as freed (so we can detect double-frees).
+memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUM

spark git commit: [SPARK-22997] Add additional defenses against use of freed MemoryBlocks

2018-01-10 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 2db523959 -> 60d4d79bb


[SPARK-22997] Add additional defenses against use of freed MemoryBlocks

## What changes were proposed in this pull request?

This patch modifies Spark's `MemoryAllocator` implementations so that 
`free(MemoryBlock)` mutates the passed block to clear pointers (in the off-heap 
case) or null out references to backing `long[]` arrays (in the on-heap case). 
The goal of this change is to add an extra layer of defense against 
use-after-free bugs because currently it's hard to detect corruption caused by 
blind writes to freed memory blocks.

## How was this patch tested?

New unit tests in `PlatformSuite`, including new tests for existing 
functionality because we did not have sufficient mutation coverage of the 
on-heap memory allocator's pooling logic.

Author: Josh Rosen 

Closes #20191 from 
JoshRosen/SPARK-22997-add-defenses-against-use-after-free-bugs-in-memory-allocator.

(cherry picked from commit f340b6b3066033d40b7e163fd5fb68e9820adfb1)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60d4d79b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60d4d79b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60d4d79b

Branch: refs/heads/branch-2.3
Commit: 60d4d79bb40f13c68773a0224f2003cdca28c138
Parents: 2db5239
Author: Josh Rosen 
Authored: Wed Jan 10 00:45:47 2018 -0800
Committer: Josh Rosen 
Committed: Wed Jan 10 00:46:27 2018 -0800

--
 .../unsafe/memory/HeapMemoryAllocator.java  | 35 ++
 .../apache/spark/unsafe/memory/MemoryBlock.java | 21 +++-
 .../unsafe/memory/UnsafeMemoryAllocator.java| 11 +
 .../apache/spark/unsafe/PlatformUtilSuite.java  | 50 +++-
 .../apache/spark/memory/TaskMemoryManager.java  | 13 -
 .../spark/memory/TaskMemoryManagerSuite.java| 29 
 6 files changed, 146 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/60d4d79b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
--
diff --git 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
index cc9cc42..3acfe36 100644
--- 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
+++ 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
@@ -31,8 +31,7 @@ import org.apache.spark.unsafe.Platform;
 public class HeapMemoryAllocator implements MemoryAllocator {
 
   @GuardedBy("this")
-  private final Map>> 
bufferPoolsBySize =
-new HashMap<>();
+  private final Map>> bufferPoolsBySize 
= new HashMap<>();
 
   private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;
 
@@ -49,13 +48,14 @@ public class HeapMemoryAllocator implements MemoryAllocator 
{
   public MemoryBlock allocate(long size) throws OutOfMemoryError {
 if (shouldPool(size)) {
   synchronized (this) {
-final LinkedList> pool = 
bufferPoolsBySize.get(size);
+final LinkedList> pool = 
bufferPoolsBySize.get(size);
 if (pool != null) {
   while (!pool.isEmpty()) {
-final WeakReference blockReference = pool.pop();
-final MemoryBlock memory = blockReference.get();
-if (memory != null) {
-  assert (memory.size() == size);
+final WeakReference arrayReference = pool.pop();
+final long[] array = arrayReference.get();
+if (array != null) {
+  assert (array.length * 8L >= size);
+  MemoryBlock memory = new MemoryBlock(array, 
Platform.LONG_ARRAY_OFFSET, size);
   if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
 memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
   }
@@ -76,18 +76,35 @@ public class HeapMemoryAllocator implements MemoryAllocator 
{
 
   @Override
   public void free(MemoryBlock memory) {
+assert (memory.obj != null) :
+  "baseObject was null; are you trying to use the on-heap allocator to 
free off-heap memory?";
+assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
+  "page has already been freed";
+assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
+|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
+  "TMM-allocated pages must first be freed via TMM.freePage(), not 
directly in allocator free()";
+
 final long size = memory.size();
 if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
   memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
 }
+
+// Mark the page a

svn commit: r24114 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_10_04_01-f340b6b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-01-10 Thread pwendell
Author: pwendell
Date: Wed Jan 10 12:15:16 2018
New Revision: 24114

Log:
Apache Spark 2.3.0-SNAPSHOT-2018_01_10_04_01-f340b6b docs


[This commit notification would consist of 1439 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22972] Couldn't find corresponding Hive SerDe for data source provider org.apache.spark.sql.hive.orc

2018-01-10 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 24f1f2a54 -> 0d943d96b


[SPARK-22972] Couldn't find corresponding Hive SerDe for data source provider 
org.apache.spark.sql.hive.orc

## What changes were proposed in this pull request?

Fix the warning: Couldn't find corresponding Hive SerDe for data source 
provider org.apache.spark.sql.hive.orc.
This PR is for branch-2.2 and cherry-pick from 
https://github.com/apache/spark/commit/8032cf852fccd0ab8754f633affdc9ba8fc99e58

The old PR is https://github.com/apache/spark/pull/20165

## How was this patch tested?

 Please see test("SPARK-22972: hive orc source")

Author: xubo245 <601450...@qq.com>

Closes #20195 from xubo245/HiveSerDeForBranch2.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d943d96
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d943d96
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d943d96

Branch: refs/heads/branch-2.2
Commit: 0d943d96b3e2cbd663177e8cab2829fefd18411a
Parents: 24f1f2a
Author: xubo245 <601450...@qq.com>
Authored: Wed Jan 10 23:27:45 2018 +0800
Committer: gatorsmile 
Committed: Wed Jan 10 23:27:45 2018 +0800

--
 .../apache/spark/sql/internal/HiveSerDe.scala   |  1 +
 .../spark/sql/hive/orc/OrcSourceSuite.scala | 32 +++-
 2 files changed, 32 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0d943d96/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
index b9515ec..dac4636 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
@@ -73,6 +73,7 @@ object HiveSerDe {
 val key = source.toLowerCase(Locale.ROOT) match {
   case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet"
   case s if s.startsWith("org.apache.spark.sql.orc") => "orc"
+  case s if s.startsWith("org.apache.spark.sql.hive.orc") => "orc"
   case s if s.equals("orcfile") => "orc"
   case s if s.equals("parquetfile") => "parquet"
   case s if s.equals("avrofile") => "avro"

http://git-wip-us.apache.org/repos/asf/spark/blob/0d943d96/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 6bfb88c..a562de4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -22,9 +22,12 @@ import java.io.File
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.sources._
+import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -197,7 +200,7 @@ abstract class OrcSuite extends QueryTest with 
TestHiveSingleton with BeforeAndA
   }
 }
 
-class OrcSourceSuite extends OrcSuite {
+class OrcSourceSuite extends OrcSuite with SQLTestUtils{
   override def beforeAll(): Unit = {
 super.beforeAll()
 
@@ -250,4 +253,31 @@ class OrcSourceSuite extends OrcSuite {
   )).get.toString
 }
   }
+
+  test("SPARK-22972: hive orc source") {
+val tableName = "normal_orc_as_source_hive"
+withTable(tableName) {
+  spark.sql(
+s"""
+  |CREATE TABLE $tableName
+  |USING org.apache.spark.sql.hive.orc
+  |OPTIONS (
+  |  PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}'
+  |)
+""".stripMargin)
+
+  val tableMetadata = spark.sessionState.catalog.getTableMetadata(
+TableIdentifier(tableName))
+  assert(tableMetadata.storage.inputFormat ==
+Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"))
+  assert(tableMetadata.storage.outputFormat ==
+Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
+  assert(tableMetadata.storage.serde ==
+Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
+  assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.hive.orc")
+.equals(HiveSerDe.sourceToSerDe("orc")))
+  assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.orc")
+.equals(

spark git commit: [SPARK-23019][CORE] Wait until SparkContext.stop() finished in SparkLauncherSuite

2018-01-10 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master f340b6b30 -> 344e3aab8


[SPARK-23019][CORE] Wait until SparkContext.stop() finished in 
SparkLauncherSuite

## What changes were proposed in this pull request?
In current code ,the function `waitFor` call 
https://github.com/apache/spark/blob/cfcd746689c2b84824745fa6d327ffb584c7a17d/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java#L155
 only wait until DAGScheduler is stopped, while SparkContext.clearActiveContext 
may not be called yet.
https://github.com/apache/spark/blob/1c9f95cb771ac78775a77edd1abfeb2d8ae2a124/core/src/main/scala/org/apache/spark/SparkContext.scala#L1924

Thus, in the Jenkins test
https://amplab.cs.berkeley.edu/jenkins/job/spark-branch-2.3-test-maven-hadoop-2.6/
 ,  `JdbcRDDSuite` failed because the previous test `SparkLauncherSuite` exit 
before SparkContext.stop() is finished.

To repo:
```
$ build/sbt
> project core
> testOnly *SparkLauncherSuite *JavaJdbcRDDSuite
```

To Fix:
Wait for a reasonable amount of time to avoid creating two active SparkContext 
in JVM in SparkLauncherSuite.
Can' come up with any better solution for now.

## How was this patch tested?

Unit test

Author: Wang Gengliang 

Closes #20221 from gengliangwang/SPARK-23019.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/344e3aab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/344e3aab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/344e3aab

Branch: refs/heads/master
Commit: 344e3aab87178e45957333479a07e07f202ca1fd
Parents: f340b6b
Author: Wang Gengliang 
Authored: Wed Jan 10 09:44:30 2018 -0800
Committer: Marcelo Vanzin 
Committed: Wed Jan 10 09:44:30 2018 -0800

--
 .../test/java/org/apache/spark/launcher/SparkLauncherSuite.java | 5 +
 1 file changed, 5 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/344e3aab/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
--
diff --git 
a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java 
b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index c2261c2..9d2f563 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 import static org.junit.Assert.*;
@@ -133,6 +134,10 @@ public class SparkLauncherSuite extends BaseSuite {
 p.put(e.getKey(), e.getValue());
   }
   System.setProperties(p);
+  // Here DAGScheduler is stopped, while SparkContext.clearActiveContext 
may not be called yet.
+  // Wait for a reasonable amount of time to avoid creating two active 
SparkContext in JVM.
+  // See SPARK-23019 and SparkContext.stop() for details.
+  TimeUnit.MILLISECONDS.sleep(500);
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23019][CORE] Wait until SparkContext.stop() finished in SparkLauncherSuite

2018-01-10 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 60d4d79bb -> 5b5851cb6


[SPARK-23019][CORE] Wait until SparkContext.stop() finished in 
SparkLauncherSuite

## What changes were proposed in this pull request?
In current code ,the function `waitFor` call 
https://github.com/apache/spark/blob/cfcd746689c2b84824745fa6d327ffb584c7a17d/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java#L155
 only wait until DAGScheduler is stopped, while SparkContext.clearActiveContext 
may not be called yet.
https://github.com/apache/spark/blob/1c9f95cb771ac78775a77edd1abfeb2d8ae2a124/core/src/main/scala/org/apache/spark/SparkContext.scala#L1924

Thus, in the Jenkins test
https://amplab.cs.berkeley.edu/jenkins/job/spark-branch-2.3-test-maven-hadoop-2.6/
 ,  `JdbcRDDSuite` failed because the previous test `SparkLauncherSuite` exit 
before SparkContext.stop() is finished.

To repo:
```
$ build/sbt
> project core
> testOnly *SparkLauncherSuite *JavaJdbcRDDSuite
```

To Fix:
Wait for a reasonable amount of time to avoid creating two active SparkContext 
in JVM in SparkLauncherSuite.
Can' come up with any better solution for now.

## How was this patch tested?

Unit test

Author: Wang Gengliang 

Closes #20221 from gengliangwang/SPARK-23019.

(cherry picked from commit 344e3aab87178e45957333479a07e07f202ca1fd)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b5851cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b5851cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b5851cb

Branch: refs/heads/branch-2.3
Commit: 5b5851cb685f395574c94174d45a47c4fbf946c8
Parents: 60d4d79
Author: Wang Gengliang 
Authored: Wed Jan 10 09:44:30 2018 -0800
Committer: Marcelo Vanzin 
Committed: Wed Jan 10 09:44:50 2018 -0800

--
 .../test/java/org/apache/spark/launcher/SparkLauncherSuite.java | 5 +
 1 file changed, 5 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5b5851cb/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
--
diff --git 
a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java 
b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index c2261c2..9d2f563 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 import static org.junit.Assert.*;
@@ -133,6 +134,10 @@ public class SparkLauncherSuite extends BaseSuite {
 p.put(e.getKey(), e.getValue());
   }
   System.setProperties(p);
+  // Here DAGScheduler is stopped, while SparkContext.clearActiveContext 
may not be called yet.
+  // Wait for a reasonable amount of time to avoid creating two active 
SparkContext in JVM.
+  // See SPARK-23019 and SparkContext.stop() for details.
+  TimeUnit.MILLISECONDS.sleep(500);
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r24115 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_10_10_01-5b5851c-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-01-10 Thread pwendell
Author: pwendell
Date: Wed Jan 10 18:15:40 2018
New Revision: 24115

Log:
Apache Spark 2.3.0-SNAPSHOT-2018_01_10_10_01-5b5851c docs


[This commit notification would consist of 1439 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r24116 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_10_12_01-344e3aa-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-01-10 Thread pwendell
Author: pwendell
Date: Wed Jan 10 20:15:34 2018
New Revision: 24116

Log:
Apache Spark 2.3.0-SNAPSHOT-2018_01_10_12_01-344e3aa docs


[This commit notification would consist of 1439 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22951][SQL] fix aggregation after dropDuplicates on empty data frames

2018-01-10 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 5b5851cb6 -> eb4fa551e


[SPARK-22951][SQL] fix aggregation after dropDuplicates on empty data frames

## What changes were proposed in this pull request?

(courtesy of liancheng)

Spark SQL supports both global aggregation and grouping aggregation. Global 
aggregation always return a single row with the initial aggregation state as 
the output, even there are zero input rows. Spark implements this by simply 
checking the number of grouping keys and treats an aggregation as a global 
aggregation if it has zero grouping keys.

However, this simple principle drops the ball in the following case:

```scala
spark.emptyDataFrame.dropDuplicates().agg(count($"*") as "c").show()
// +---+
// | c |
// +---+
// | 1 |
// +---+
```

The reason is that:

1. `df.dropDuplicates()` is roughly translated into something equivalent to:

```scala
val allColumns = df.columns.map { col }
df.groupBy(allColumns: _*).agg(allColumns.head, allColumns.tail: _*)
```

This translation is implemented in the rule `ReplaceDeduplicateWithAggregate`.

2. `spark.emptyDataFrame` contains zero columns and zero rows.

Therefore, rule `ReplaceDeduplicateWithAggregate` makes a confusing 
transformation roughly equivalent to the following one:

```scala
spark.emptyDataFrame.dropDuplicates()
=> spark.emptyDataFrame.groupBy().agg(Map.empty[String, String])
```

The above transformation is confusing because the resulting aggregate operator 
contains no grouping keys (because `emptyDataFrame` contains no columns), and 
gets recognized as a global aggregation. As a result, Spark SQL allocates a 
single row filled by the initial aggregation state and uses it as the output, 
and returns a wrong result.

To fix this issue, this PR tweaks `ReplaceDeduplicateWithAggregate` by 
appending a literal `1` to the grouping key list of the resulting `Aggregate` 
operator when the input plan contains zero output columns. In this way, 
`spark.emptyDataFrame.dropDuplicates()` is now translated into a grouping 
aggregation, roughly depicted as:

```scala
spark.emptyDataFrame.dropDuplicates()
=> spark.emptyDataFrame.groupBy(lit(1)).agg(Map.empty[String, String])
```

Which is now properly treated as a grouping aggregation and returns the correct 
answer.

## How was this patch tested?

New unit tests added

Author: Feng Liu 

Closes #20174 from liufengdb/fix-duplicate.

(cherry picked from commit 9b33dfc408de986f4203bb0ac0c3f5c56effd69d)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb4fa551
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb4fa551
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb4fa551

Branch: refs/heads/branch-2.3
Commit: eb4fa551e60800269a939b2c1c0ad69e3a801264
Parents: 5b5851c
Author: Feng Liu 
Authored: Wed Jan 10 14:25:04 2018 -0800
Committer: Cheng Lian 
Committed: Wed Jan 10 14:25:33 2018 -0800

--
 .../sql/catalyst/optimizer/Optimizer.scala  |  8 ++-
 .../optimizer/ReplaceOperatorSuite.scala| 10 +++-
 .../spark/sql/DataFrameAggregateSuite.scala | 24 ++--
 3 files changed, 38 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/eb4fa551/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index df0af82..c794ba8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1222,7 +1222,13 @@ object ReplaceDeduplicateWithAggregate extends 
Rule[LogicalPlan] {
   Alias(new First(attr).toAggregateExpression(), 
attr.name)(attr.exprId)
 }
   }
-  Aggregate(keys, aggCols, child)
+  // SPARK-22951: Physical aggregate operators distinguishes global 
aggregation and grouping
+  // aggregations by checking the number of grouping keys. The key 
difference here is that a
+  // global aggregation always returns at least one row even if there are 
no input rows. Here
+  // we append a literal when the grouping key list is empty so that the 
result aggregate
+  // operator is properly treated as a grouping aggregation.
+  val nonemptyKeys = if (keys.isEmpty) Literal(1) :: Nil else keys
+  Aggregate(nonemptyKeys, aggCols, child)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/eb4fa551/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
--

spark git commit: [SPARK-22951][SQL] fix aggregation after dropDuplicates on empty data frames

2018-01-10 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 344e3aab8 -> 9b33dfc40


[SPARK-22951][SQL] fix aggregation after dropDuplicates on empty data frames

## What changes were proposed in this pull request?

(courtesy of liancheng)

Spark SQL supports both global aggregation and grouping aggregation. Global 
aggregation always return a single row with the initial aggregation state as 
the output, even there are zero input rows. Spark implements this by simply 
checking the number of grouping keys and treats an aggregation as a global 
aggregation if it has zero grouping keys.

However, this simple principle drops the ball in the following case:

```scala
spark.emptyDataFrame.dropDuplicates().agg(count($"*") as "c").show()
// +---+
// | c |
// +---+
// | 1 |
// +---+
```

The reason is that:

1. `df.dropDuplicates()` is roughly translated into something equivalent to:

```scala
val allColumns = df.columns.map { col }
df.groupBy(allColumns: _*).agg(allColumns.head, allColumns.tail: _*)
```

This translation is implemented in the rule `ReplaceDeduplicateWithAggregate`.

2. `spark.emptyDataFrame` contains zero columns and zero rows.

Therefore, rule `ReplaceDeduplicateWithAggregate` makes a confusing 
transformation roughly equivalent to the following one:

```scala
spark.emptyDataFrame.dropDuplicates()
=> spark.emptyDataFrame.groupBy().agg(Map.empty[String, String])
```

The above transformation is confusing because the resulting aggregate operator 
contains no grouping keys (because `emptyDataFrame` contains no columns), and 
gets recognized as a global aggregation. As a result, Spark SQL allocates a 
single row filled by the initial aggregation state and uses it as the output, 
and returns a wrong result.

To fix this issue, this PR tweaks `ReplaceDeduplicateWithAggregate` by 
appending a literal `1` to the grouping key list of the resulting `Aggregate` 
operator when the input plan contains zero output columns. In this way, 
`spark.emptyDataFrame.dropDuplicates()` is now translated into a grouping 
aggregation, roughly depicted as:

```scala
spark.emptyDataFrame.dropDuplicates()
=> spark.emptyDataFrame.groupBy(lit(1)).agg(Map.empty[String, String])
```

Which is now properly treated as a grouping aggregation and returns the correct 
answer.

## How was this patch tested?

New unit tests added

Author: Feng Liu 

Closes #20174 from liufengdb/fix-duplicate.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b33dfc4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b33dfc4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b33dfc4

Branch: refs/heads/master
Commit: 9b33dfc408de986f4203bb0ac0c3f5c56effd69d
Parents: 344e3aa
Author: Feng Liu 
Authored: Wed Jan 10 14:25:04 2018 -0800
Committer: Cheng Lian 
Committed: Wed Jan 10 14:25:04 2018 -0800

--
 .../sql/catalyst/optimizer/Optimizer.scala  |  8 ++-
 .../optimizer/ReplaceOperatorSuite.scala| 10 +++-
 .../spark/sql/DataFrameAggregateSuite.scala | 24 ++--
 3 files changed, 38 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9b33dfc4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index df0af82..c794ba8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1222,7 +1222,13 @@ object ReplaceDeduplicateWithAggregate extends 
Rule[LogicalPlan] {
   Alias(new First(attr).toAggregateExpression(), 
attr.name)(attr.exprId)
 }
   }
-  Aggregate(keys, aggCols, child)
+  // SPARK-22951: Physical aggregate operators distinguishes global 
aggregation and grouping
+  // aggregations by checking the number of grouping keys. The key 
difference here is that a
+  // global aggregation always returns at least one row even if there are 
no input rows. Here
+  // we append a literal when the grouping key list is empty so that the 
result aggregate
+  // operator is properly treated as a grouping aggregation.
+  val nonemptyKeys = if (keys.isEmpty) Literal(1) :: Nil else keys
+  Aggregate(nonemptyKeys, aggCols, child)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9b33dfc4/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/c

svn commit: r24120 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_10_16_01-9b33dfc-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-01-10 Thread pwendell
Author: pwendell
Date: Thu Jan 11 00:15:19 2018
New Revision: 24120

Log:
Apache Spark 2.3.0-SNAPSHOT-2018_01_10_16_01-9b33dfc docs


[This commit notification would consist of 1439 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23009][PYTHON] Fix for non-str col names to createDataFrame from Pandas

2018-01-10 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 eb4fa551e -> 551ccfba5


[SPARK-23009][PYTHON] Fix for non-str col names to createDataFrame from Pandas

## What changes were proposed in this pull request?

This the case when calling `SparkSession.createDataFrame` using a Pandas 
DataFrame that has non-str column labels.

The column name conversion logic to handle non-string or unicode in python2 is:
```
if column is not any type of string:
name = str(column)
else if column is unicode in Python 2:
name = column.encode('utf-8')
```

## How was this patch tested?

Added a new test with a Pandas DataFrame that has int column labels

Author: Bryan Cutler 

Closes #20210 from BryanCutler/python-createDataFrame-int-col-error-SPARK-23009.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/551ccfba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/551ccfba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/551ccfba

Branch: refs/heads/branch-2.3
Commit: 551ccfba529996e987c4d2e8d4dd61c4ab9a2e95
Parents: eb4fa55
Author: Bryan Cutler 
Authored: Wed Jan 10 14:55:24 2018 +0900
Committer: hyukjinkwon 
Committed: Thu Jan 11 09:46:50 2018 +0900

--
 python/pyspark/sql/session.py | 4 +++-
 python/pyspark/sql/tests.py   | 9 +
 2 files changed, 12 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/551ccfba/python/pyspark/sql/session.py
--
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 3e45747..604021c 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -648,7 +648,9 @@ class SparkSession(object):
 
 # If no schema supplied by user then get the names of columns only
 if schema is None:
-schema = [x.encode('utf-8') if not isinstance(x, str) else x 
for x in data.columns]
+schema = [str(x) if not isinstance(x, basestring) else
+  (x.encode('utf-8') if not isinstance(x, str) else x)
+  for x in data.columns]
 
 if self.conf.get("spark.sql.execution.arrow.enabled", 
"false").lower() == "true" \
 and len(data) > 0:

http://git-wip-us.apache.org/repos/asf/spark/blob/551ccfba/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 13576ff..80a94a9 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3532,6 +3532,15 @@ class ArrowTests(ReusedSQLTestCase):
 self.assertTrue(expected[r][e] == result_arrow[r][e] and
 result[r][e] == result_arrow[r][e])
 
+def test_createDataFrame_with_int_col_names(self):
+import numpy as np
+import pandas as pd
+pdf = pd.DataFrame(np.random.rand(4, 2))
+df, df_arrow = self._createDataFrame_toggle(pdf)
+pdf_col_names = [str(c) for c in pdf.columns]
+self.assertEqual(pdf_col_names, df.columns)
+self.assertEqual(pdf_col_names, df_arrow.columns)
+
 
 @unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
 class PandasUDFTests(ReusedSQLTestCase):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r24124 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_10_18_01-551ccfb-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-01-10 Thread pwendell
Author: pwendell
Date: Thu Jan 11 02:15:27 2018
New Revision: 24124

Log:
Apache Spark 2.3.0-SNAPSHOT-2018_01_10_18_01-551ccfb docs


[This commit notification would consist of 1439 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22587] Spark job fails if fs.defaultFS and application jar are different url

2018-01-10 Thread jshao
Repository: spark
Updated Branches:
  refs/heads/master 9b33dfc40 -> a6647ffbf


[SPARK-22587] Spark job fails if fs.defaultFS and application jar are different 
url

## What changes were proposed in this pull request?

Two filesystems comparing does not consider the authority of URI. This is 
specific for
WASB file storage system, where userInfo is honored to differentiate 
filesystems.
For example: wasbs://user1xyz.net, wasbs://user2xyz.net would consider as two 
filesystem.
Therefore, we have to add the authority to compare two filesystem, and  two 
filesystem with different authority can not be the same FS.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Mingjie Tang 

Closes #19885 from merlintang/EAR-7377.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6647ffb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6647ffb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6647ffb

Branch: refs/heads/master
Commit: a6647ffbf7a312a3e119a9beef90880cc915aa60
Parents: 9b33dfc
Author: Mingjie Tang 
Authored: Thu Jan 11 11:51:03 2018 +0800
Committer: jerryshao 
Committed: Thu Jan 11 11:51:03 2018 +0800

--
 .../org/apache/spark/deploy/yarn/Client.scala   | 24 +++---
 .../apache/spark/deploy/yarn/ClientSuite.scala  | 33 
 2 files changed, 53 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a6647ffb/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
--
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 15328d0..8cd3cd9 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1421,15 +1421,20 @@ private object Client extends Logging {
   }
 
   /**
-   * Return whether the two file systems are the same.
+   * Return whether two URI represent file system are the same
*/
-  private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
-val srcUri = srcFs.getUri()
-val dstUri = destFs.getUri()
+  private[spark] def compareUri(srcUri: URI, dstUri: URI): Boolean = {
+
 if (srcUri.getScheme() == null || srcUri.getScheme() != 
dstUri.getScheme()) {
   return false
 }
 
+val srcAuthority = srcUri.getAuthority()
+val dstAuthority = dstUri.getAuthority()
+if (srcAuthority != null && !srcAuthority.equalsIgnoreCase(dstAuthority)) {
+  return false
+}
+
 var srcHost = srcUri.getHost()
 var dstHost = dstUri.getHost()
 
@@ -1447,6 +1452,17 @@ private object Client extends Logging {
 }
 
 Objects.equal(srcHost, dstHost) && srcUri.getPort() == dstUri.getPort()
+
+  }
+
+  /**
+   * Return whether the two file systems are the same.
+   */
+  protected def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
+val srcUri = srcFs.getUri()
+val dstUri = destFs.getUri()
+
+compareUri(srcUri, dstUri)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/a6647ffb/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
--
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 9d5f5eb..7fa5971 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -357,6 +357,39 @@ class ClientSuite extends SparkFunSuite with Matchers {
 sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new 
File(jar2.toURI).getName)))
   }
 
+  private val matching = Seq(
+("files URI match test1", "file:///file1", "file:///file2"),
+("files URI match test2", "file:///c:file1", "file://c:file2"),
+("files URI match test3", "file://host/file1", "file://host/file2"),
+("wasb URI match test", "wasb://bucket1@user", "wasb://bucket1@user/"),
+("hdfs URI match test", "hdfs:/path1", "hdfs:/path1")
+  )
+
+  matching.foreach { t =>
+  test(t._1) {
+assert(Client.compareUri(new URI(t._2), new URI(t._3)),
+  s"No match between ${t._2} and ${t._3}")
+  }
+  }
+
+  private val unmatching = Seq(
+("files URI unmatch test1", "file:///file1", "file://host/file2"),
+("files URI unmatch test2", "file://host/file1", "file:///file2"),
+("files URI unmatch test3", "file://host/file1

spark git commit: [SPARK-22587] Spark job fails if fs.defaultFS and application jar are different url

2018-01-10 Thread jshao
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 551ccfba5 -> 317b0aaed


[SPARK-22587] Spark job fails if fs.defaultFS and application jar are different 
url

## What changes were proposed in this pull request?

Two filesystems comparing does not consider the authority of URI. This is 
specific for
WASB file storage system, where userInfo is honored to differentiate 
filesystems.
For example: wasbs://user1xyz.net, wasbs://user2xyz.net would consider as two 
filesystem.
Therefore, we have to add the authority to compare two filesystem, and  two 
filesystem with different authority can not be the same FS.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Mingjie Tang 

Closes #19885 from merlintang/EAR-7377.

(cherry picked from commit a6647ffbf7a312a3e119a9beef90880cc915aa60)
Signed-off-by: jerryshao 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/317b0aae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/317b0aae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/317b0aae

Branch: refs/heads/branch-2.3
Commit: 317b0aaed83e4bbf66f63ddc0d618da9f1f85085
Parents: 551ccfb
Author: Mingjie Tang 
Authored: Thu Jan 11 11:51:03 2018 +0800
Committer: jerryshao 
Committed: Thu Jan 11 11:51:34 2018 +0800

--
 .../org/apache/spark/deploy/yarn/Client.scala   | 24 +++---
 .../apache/spark/deploy/yarn/ClientSuite.scala  | 33 
 2 files changed, 53 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/317b0aae/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
--
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 15328d0..8cd3cd9 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1421,15 +1421,20 @@ private object Client extends Logging {
   }
 
   /**
-   * Return whether the two file systems are the same.
+   * Return whether two URI represent file system are the same
*/
-  private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
-val srcUri = srcFs.getUri()
-val dstUri = destFs.getUri()
+  private[spark] def compareUri(srcUri: URI, dstUri: URI): Boolean = {
+
 if (srcUri.getScheme() == null || srcUri.getScheme() != 
dstUri.getScheme()) {
   return false
 }
 
+val srcAuthority = srcUri.getAuthority()
+val dstAuthority = dstUri.getAuthority()
+if (srcAuthority != null && !srcAuthority.equalsIgnoreCase(dstAuthority)) {
+  return false
+}
+
 var srcHost = srcUri.getHost()
 var dstHost = dstUri.getHost()
 
@@ -1447,6 +1452,17 @@ private object Client extends Logging {
 }
 
 Objects.equal(srcHost, dstHost) && srcUri.getPort() == dstUri.getPort()
+
+  }
+
+  /**
+   * Return whether the two file systems are the same.
+   */
+  protected def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
+val srcUri = srcFs.getUri()
+val dstUri = destFs.getUri()
+
+compareUri(srcUri, dstUri)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/317b0aae/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
--
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 9d5f5eb..7fa5971 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -357,6 +357,39 @@ class ClientSuite extends SparkFunSuite with Matchers {
 sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new 
File(jar2.toURI).getName)))
   }
 
+  private val matching = Seq(
+("files URI match test1", "file:///file1", "file:///file2"),
+("files URI match test2", "file:///c:file1", "file://c:file2"),
+("files URI match test3", "file://host/file1", "file://host/file2"),
+("wasb URI match test", "wasb://bucket1@user", "wasb://bucket1@user/"),
+("hdfs URI match test", "hdfs:/path1", "hdfs:/path1")
+  )
+
+  matching.foreach { t =>
+  test(t._1) {
+assert(Client.compareUri(new URI(t._2), new URI(t._3)),
+  s"No match between ${t._2} and ${t._3}")
+  }
+  }
+
+  private val unmatching = Seq(
+("files URI unmatch test1", "file:///file1", "file://host/file2"),
+("files URI un

svn commit: r24127 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_10_20_01-a6647ff-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-01-10 Thread pwendell
Author: pwendell
Date: Thu Jan 11 04:15:17 2018
New Revision: 24127

Log:
Apache Spark 2.3.0-SNAPSHOT-2018_01_10_20_01-a6647ff docs


[This commit notification would consist of 1439 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r24128 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_10_22_01-317b0aa-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-01-10 Thread pwendell
Author: pwendell
Date: Thu Jan 11 06:16:07 2018
New Revision: 24128

Log:
Apache Spark 2.3.0-SNAPSHOT-2018_01_10_22_01-317b0aa docs


[This commit notification would consist of 1439 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org