spark git commit: [SPARK-21315][SQL] Skip some spill files when generateIterator(startIndex) in ExternalAppendOnlyUnsafeRowArray.

2017-07-10 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 833eab2c9 -> 97a1aa2c7


[SPARK-21315][SQL] Skip some spill files when generateIterator(startIndex) in 
ExternalAppendOnlyUnsafeRowArray.

## What changes were proposed in this pull request?

In current code, it is expensive to use 
`UnboundedFollowingWindowFunctionFrame`, because it is iterating from the start 
to lower bound every time calling `write` method. When traverse the iterator, 
it's possible to skip some spilled files thus to save some time.

## How was this patch tested?

Added unit test

Did a small test for benchmark:

Put 2000200 rows into `UnsafeExternalSorter`-- 2 spill files(each contains 
100 rows) and inMemSorter contains 200 rows.
Move the iterator forward to index=201.

*With this change*:
`getIterator(201)`, it will cost almost 0ms~1ms;
*Without this change*:
`for(int i=0; i<201; i++)geIterator().loadNext()`, it will cost 300ms.

Author: jinxing 

Closes #18541 from jinxing64/SPARK-21315.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97a1aa2c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97a1aa2c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97a1aa2c

Branch: refs/heads/master
Commit: 97a1aa2c70b1bf726d5f572789e150d168ac61e5
Parents: 833eab2
Author: jinxing 
Authored: Tue Jul 11 11:47:47 2017 +0800
Committer: Wenchen Fan 
Committed: Tue Jul 11 11:47:47 2017 +0800

--
 .../unsafe/sort/UnsafeExternalSorter.java   | 35 +---
 .../unsafe/sort/UnsafeSorterSpillWriter.java|  4 +++
 .../unsafe/sort/UnsafeExternalSorterSuite.java  | 34 ++-
 .../ExternalAppendOnlyUnsafeRowArray.scala  | 22 ++--
 ...ernalAppendOnlyUnsafeRowArrayBenchmark.scala |  2 +-
 5 files changed, 70 insertions(+), 27 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/97a1aa2c/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 82d03e3..a6e858c 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -589,29 +589,54 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
   }
 
   /**
-   * Returns a iterator, which will return the rows in the order as inserted.
+   * Returns an iterator starts from startIndex, which will return the rows in 
the order as
+   * inserted.
*
* It is the caller's responsibility to call `cleanupResources()`
* after consuming this iterator.
*
* TODO: support forced spilling
*/
-  public UnsafeSorterIterator getIterator() throws IOException {
+  public UnsafeSorterIterator getIterator(int startIndex) throws IOException {
 if (spillWriters.isEmpty()) {
   assert(inMemSorter != null);
-  return inMemSorter.getSortedIterator();
+  UnsafeSorterIterator iter = inMemSorter.getSortedIterator();
+  moveOver(iter, startIndex);
+  return iter;
 } else {
   LinkedList queue = new LinkedList<>();
+  int i = 0;
   for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
-queue.add(spillWriter.getReader(serializerManager));
+if (i + spillWriter.recordsSpilled() > startIndex) {
+  UnsafeSorterIterator iter = spillWriter.getReader(serializerManager);
+  moveOver(iter, startIndex - i);
+  queue.add(iter);
+}
+i += spillWriter.recordsSpilled();
   }
   if (inMemSorter != null) {
-queue.add(inMemSorter.getSortedIterator());
+UnsafeSorterIterator iter = inMemSorter.getSortedIterator();
+moveOver(iter, startIndex - i);
+queue.add(iter);
   }
   return new ChainedIterator(queue);
 }
   }
 
+  private void moveOver(UnsafeSorterIterator iter, int steps)
+  throws IOException {
+if (steps > 0) {
+  for (int i = 0; i < steps; i++) {
+if (iter.hasNext()) {
+  iter.loadNext();
+} else {
+  throw new ArrayIndexOutOfBoundsException("Failed to move the 
iterator " + steps +
+" steps forward");
+}
+  }
+}
+  }
+
   /**
* Chain multiple UnsafeSorterIterator together as single one.
*/

http://git-wip-us.apache.org/repos/asf/spark/blob/97a1aa2c/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java

spark git commit: [SPARK-21369][CORE] Don't use Scala Tuple2 in common/network-*

2017-07-10 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 a05edf454 -> edcd9fbc9


[SPARK-21369][CORE] Don't use Scala Tuple2 in common/network-*

## What changes were proposed in this pull request?

Remove all usages of Scala Tuple2 from common/network-* projects. Otherwise, 
Yarn users cannot use `spark.reducer.maxReqSizeShuffleToMem`.

## How was this patch tested?

Jenkins.

Author: Shixiong Zhu 

Closes #18593 from zsxwing/SPARK-21369.

(cherry picked from commit 833eab2c9bd273ee9577fbf9e480d3e3a4b7d203)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/edcd9fbc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/edcd9fbc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/edcd9fbc

Branch: refs/heads/branch-2.2
Commit: edcd9fbc92683753d55ed0c69f391bf3bed59da4
Parents: a05edf4
Author: Shixiong Zhu 
Authored: Tue Jul 11 11:26:17 2017 +0800
Committer: Wenchen Fan 
Committed: Tue Jul 11 11:26:28 2017 +0800

--
 common/network-common/pom.xml   |  3 ++-
 .../client/TransportResponseHandler.java| 20 ++--
 .../network/server/OneForOneStreamManager.java  | 17 +
 common/network-shuffle/pom.xml  |  1 +
 common/network-yarn/pom.xml |  1 +
 5 files changed, 19 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/edcd9fbc/common/network-common/pom.xml
--
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index 7577253..303e25f 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -90,7 +90,8 @@
 
   org.apache.spark
   spark-tags_${scala.binary.version}
-
+  test
+   
 
 

spark git commit: [SPARK-21369][CORE] Don't use Scala Tuple2 in common/network-*

2017-07-10 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 1471ee7af -> 833eab2c9


[SPARK-21369][CORE] Don't use Scala Tuple2 in common/network-*

## What changes were proposed in this pull request?

Remove all usages of Scala Tuple2 from common/network-* projects. Otherwise, 
Yarn users cannot use `spark.reducer.maxReqSizeShuffleToMem`.

## How was this patch tested?

Jenkins.

Author: Shixiong Zhu 

Closes #18593 from zsxwing/SPARK-21369.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/833eab2c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/833eab2c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/833eab2c

Branch: refs/heads/master
Commit: 833eab2c9bd273ee9577fbf9e480d3e3a4b7d203
Parents: 1471ee7
Author: Shixiong Zhu 
Authored: Tue Jul 11 11:26:17 2017 +0800
Committer: Wenchen Fan 
Committed: Tue Jul 11 11:26:17 2017 +0800

--
 common/network-common/pom.xml   |  3 ++-
 .../client/TransportResponseHandler.java| 20 ++--
 .../network/server/OneForOneStreamManager.java  | 17 +
 common/network-shuffle/pom.xml  |  1 +
 common/network-yarn/pom.xml |  1 +
 5 files changed, 19 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/833eab2c/common/network-common/pom.xml
--
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index 066970f..0254d0c 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -90,7 +90,8 @@
 
   org.apache.spark
   spark-tags_${scala.binary.version}
-
+  test
+   
 
 

spark git commit: [SPARK-21350][SQL] Fix the error message when the number of arguments is wrong when invoking a UDF

2017-07-10 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master a2bec6c92 -> 1471ee7af


[SPARK-21350][SQL] Fix the error message when the number of arguments is wrong 
when invoking a UDF

### What changes were proposed in this pull request?
Users get a very confusing error when users specify a wrong number of 
parameters.
```Scala
val df = spark.emptyDataFrame
spark.udf.register("foo", (_: String).length)
df.selectExpr("foo(2, 3, 4)")
```
```
org.apache.spark.sql.UDFSuite$$anonfun$9$$anonfun$apply$mcV$sp$12 cannot be 
cast to scala.Function3
java.lang.ClassCastException: 
org.apache.spark.sql.UDFSuite$$anonfun$9$$anonfun$apply$mcV$sp$12 cannot be 
cast to scala.Function3
at 
org.apache.spark.sql.catalyst.expressions.ScalaUDF.(ScalaUDF.scala:109)
```

This PR is to capture the exception and issue an error message that is 
consistent with what we did for built-in functions. After the fix, the error 
message is improved to
```
Invalid number of arguments for function foo; line 1 pos 0
org.apache.spark.sql.AnalysisException: Invalid number of arguments for 
function foo; line 1 pos 0
at 
org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:119)
```

### How was this patch tested?
Added a test case

Author: gatorsmile 

Closes #18574 from gatorsmile/statsCheck.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1471ee7a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1471ee7a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1471ee7a

Branch: refs/heads/master
Commit: 1471ee7af5a9952b60cf8c56d60cb6a7ec46cc69
Parents: a2bec6c
Author: gatorsmile 
Authored: Tue Jul 11 11:19:59 2017 +0800
Committer: Wenchen Fan 
Committed: Tue Jul 11 11:19:59 2017 +0800

--
 .../org/apache/spark/sql/UDFRegistration.scala  | 412 ++-
 .../test/org/apache/spark/sql/JavaUDFSuite.java |   8 +
 .../scala/org/apache/spark/sql/UDFSuite.scala   |  13 +-
 3 files changed, 331 insertions(+), 102 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1471ee7a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index 8bdc022..c4d0adb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -111,7 +111,12 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
 def register[$typeTags](name: String, func: Function$x[$types]): 
UserDefinedFunction = {
   val ScalaReflection.Schema(dataType, nullable) = 
ScalaReflection.schemaFor[RT]
   val inputTypes = Try($inputTypes).toOption
-  def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, 
inputTypes.getOrElse(Nil), Some(name), nullable)
+  def builder(e: Seq[Expression]) = if (e.length == $x) {
+ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), 
nullable)
+  } else {
+ throw new AnalysisException("Invalid number of arguments for 
function " + name +
+   ". Expected: $x; Found: " + e.length)
+  }
   functionRegistry.createOrReplaceTempFunction(name, builder)
   UserDefinedFunction(func, dataType, 
inputTypes).withName(name).withNullability(nullable)
 }""")
@@ -123,16 +128,20 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
   val anyCast = s".asInstanceOf[UDF$i[$anyTypeArgs, Any]]"
   val anyParams = (1 to i).map(_ => "_: Any").mkString(", ")
   println(s"""
- |/**
- | * Register a user-defined function with ${i} arguments.
- | * @since 1.3.0
- | */
- |def register(name: String, f: UDF$i[$extTypeArgs, _], returnType: 
DataType): Unit = {
- |  val func = f$anyCast.call($anyParams)
- |  functionRegistry.createOrReplaceTempFunction(
- |name,
- |(e: Seq[Expression]) => ScalaUDF(func, returnType, e))
- |}""".stripMargin)
+|/**
+| * Register a user-defined function with ${i} arguments.
+| * @since 1.3.0
+| */
+|def register(name: String, f: UDF$i[$extTypeArgs, _], returnType: 
DataType): Unit = {
+|  val func = f$anyCast.call($anyParams)
+|def builder(e: Seq[Expression]) = if (e.length == $i) {
+|  ScalaUDF(func, returnType, e)
+|} else {
+|  throw new AnalysisException("Invalid number of arguments for 
function " + name +

spark git commit: [SPARK-21043][SQL] Add unionByName in Dataset

2017-07-10 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master c3713fde8 -> a2bec6c92


[SPARK-21043][SQL] Add unionByName in Dataset

## What changes were proposed in this pull request?
This pr added `unionByName` in `DataSet`.
Here is how to use:
```
val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.unionByName(df2).show

// output:
// ++++
// |col0|col1|col2|
// ++++
// |   1|   2|   3|
// |   6|   4|   5|
// ++++
```

## How was this patch tested?
Added tests in `DataFrameSuite`.

Author: Takeshi Yamamuro 

Closes #18300 from maropu/SPARK-21043-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2bec6c9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2bec6c9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2bec6c9

Branch: refs/heads/master
Commit: a2bec6c92a063f4a8e9ed75a9f3f06808485b6d7
Parents: c3713fd
Author: Takeshi Yamamuro 
Authored: Mon Jul 10 20:16:29 2017 -0700
Committer: gatorsmile 
Committed: Mon Jul 10 20:16:29 2017 -0700

--
 .../scala/org/apache/spark/sql/Dataset.scala| 60 ++
 .../org/apache/spark/sql/DataFrameSuite.scala   | 87 
 2 files changed, 147 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a2bec6c9/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index a777383..7f3ae05 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -53,6 +53,7 @@ import org.apache.spark.sql.execution.python.EvaluatePython
 import org.apache.spark.sql.execution.stat.StatFunctions
 import org.apache.spark.sql.streaming.DataStreamWriter
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SchemaUtils
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.CalendarInterval
 import org.apache.spark.util.Utils
@@ -1735,6 +1736,65 @@ class Dataset[T] private[sql](
   }
 
   /**
+   * Returns a new Dataset containing union of rows in this Dataset and 
another Dataset.
+   *
+   * This is different from both `UNION ALL` and `UNION DISTINCT` in SQL. To 
do a SQL-style set
+   * union (that does deduplication of elements), use this function followed 
by a [[distinct]].
+   *
+   * The difference between this function and [[union]] is that this function
+   * resolves columns by name (not by position):
+   *
+   * {{{
+   *   val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
+   *   val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
+   *   df1.unionByName(df2).show
+   *
+   *   // output:
+   *   // ++++
+   *   // |col0|col1|col2|
+   *   // ++++
+   *   // |   1|   2|   3|
+   *   // |   6|   4|   5|
+   *   // ++++
+   * }}}
+   *
+   * @group typedrel
+   * @since 2.3.0
+   */
+  def unionByName(other: Dataset[T]): Dataset[T] = withSetOperator {
+// Check column name duplication
+val resolver = sparkSession.sessionState.analyzer.resolver
+val leftOutputAttrs = logicalPlan.output
+val rightOutputAttrs = other.logicalPlan.output
+
+SchemaUtils.checkColumnNameDuplication(
+  leftOutputAttrs.map(_.name),
+  "in the left attributes",
+  sparkSession.sessionState.conf.caseSensitiveAnalysis)
+SchemaUtils.checkColumnNameDuplication(
+  rightOutputAttrs.map(_.name),
+  "in the right attributes",
+  sparkSession.sessionState.conf.caseSensitiveAnalysis)
+
+// Builds a project list for `other` based on `logicalPlan` output names
+val rightProjectList = leftOutputAttrs.map { lattr =>
+  rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) 
}.getOrElse {
+throw new AnalysisException(
+  s"""Cannot resolve column name "${lattr.name}" among """ +
+s"""(${rightOutputAttrs.map(_.name).mkString(", ")})""")
+  }
+}
+
+// Delegates failure checks to `CheckAnalysis`
+val notFoundAttrs = rightOutputAttrs.diff(rightProjectList)
+val rightChild = Project(rightProjectList ++ notFoundAttrs, 
other.logicalPlan)
+
+// This breaks caching, but it's usually ok because it addresses a very 
specific use case:
+// using union to union many files or partitions.
+CombineUnions(Union(logicalPlan, rightChild))
+  }
+
+  /**
* Returns a new Dataset containing rows only in both this Dataset and 
another Dataset.
* This is equivalent to `INTERSECT` in SQL.
*


spark git commit: [SPARK-21358][EXAMPLES] Argument of repartitionandsortwithinpartitions at pyspark

2017-07-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master d03aebbe6 -> c3713fde8


[SPARK-21358][EXAMPLES] Argument of repartitionandsortwithinpartitions at 
pyspark

## What changes were proposed in this pull request?
At example of repartitionAndSortWithinPartitions at rdd.py, third argument 
should be True or False.
I proposed fix of example code.

## How was this patch tested?
* I rename test_repartitionAndSortWithinPartitions to 
test_repartitionAndSortWIthinPartitions_asc to specify boolean argument.
* I added test_repartitionAndSortWithinPartitions_desc to test False pattern at 
third argument.

(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: chie8842 

Closes #18586 from chie8842/SPARK-21358.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3713fde
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3713fde
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3713fde

Branch: refs/heads/master
Commit: c3713fde86204bf3f027483914ff9e60e7aad261
Parents: d03aebb
Author: chie8842 
Authored: Mon Jul 10 18:56:54 2017 -0700
Committer: Reynold Xin 
Committed: Mon Jul 10 18:56:54 2017 -0700

--
 python/pyspark/rdd.py   |  2 +-
 python/pyspark/tests.py | 12 ++--
 2 files changed, 11 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c3713fde/python/pyspark/rdd.py
--
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 7dfa17f..3325b65 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -608,7 +608,7 @@ class RDD(object):
 sort records by their keys.
 
 >>> rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 
3)])
->>> rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, 
2)
+>>> rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, 
True)
 >>> rdd2.glom().collect()
 [[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]]
 """

http://git-wip-us.apache.org/repos/asf/spark/blob/c3713fde/python/pyspark/tests.py
--
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index bb13de5..73ab442 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1019,14 +1019,22 @@ class RDDTests(ReusedPySparkTestCase):
 self.assertEqual((["ab", "ef"], [5]), rdd.histogram(1))
 self.assertRaises(TypeError, lambda: rdd.histogram(2))
 
-def test_repartitionAndSortWithinPartitions(self):
+def test_repartitionAndSortWithinPartitions_asc(self):
 rdd = self.sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 
3)], 2)
 
-repartitioned = rdd.repartitionAndSortWithinPartitions(2, lambda key: 
key % 2)
+repartitioned = rdd.repartitionAndSortWithinPartitions(2, lambda key: 
key % 2, True)
 partitions = repartitioned.glom().collect()
 self.assertEqual(partitions[0], [(0, 5), (0, 8), (2, 6)])
 self.assertEqual(partitions[1], [(1, 3), (3, 8), (3, 8)])
 
+def test_repartitionAndSortWithinPartitions_desc(self):
+rdd = self.sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 
3)], 2)
+
+repartitioned = rdd.repartitionAndSortWithinPartitions(2, lambda key: 
key % 2, False)
+partitions = repartitioned.glom().collect()
+self.assertEqual(partitions[0], [(2, 6), (0, 5), (0, 8)])
+self.assertEqual(partitions[1], [(3, 8), (3, 8), (1, 3)])
+
 def test_repartition_no_skewed(self):
 num_partitions = 20
 a = self.sc.parallelize(range(int(1000)), 2)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-13534][PYSPARK] Using Apache Arrow to increase performance of DataFrame.toPandas

2017-07-10 Thread holden
Repository: spark
Updated Branches:
  refs/heads/master 2bfd5accd -> d03aebbe6


[SPARK-13534][PYSPARK] Using Apache Arrow to increase performance of 
DataFrame.toPandas

## What changes were proposed in this pull request?
Integrate Apache Arrow with Spark to increase performance of 
`DataFrame.toPandas`.  This has been done by using Arrow to convert data 
partitions on the executor JVM to Arrow payload byte arrays where they are then 
served to the Python process.  The Python DataFrame can then collect the Arrow 
payloads where they are combined and converted to a Pandas DataFrame.  Data 
types except complex, date, timestamp, and decimal  are currently supported, 
otherwise an `UnsupportedOperation` exception is thrown.

Additions to Spark include a Scala package private method 
`Dataset.toArrowPayload` that will convert data partitions in the executor JVM 
to `ArrowPayload`s as byte arrays so they can be easily served.  A package 
private class/object `ArrowConverters` that provide data type mappings and 
conversion routines.  In Python, a private method `DataFrame._collectAsArrow` 
is added to collect Arrow payloads and a SQLConf 
"spark.sql.execution.arrow.enable" can be used in `toPandas()` to enable using 
Arrow (uses the old conversion by default).

## How was this patch tested?
Added a new test suite `ArrowConvertersSuite` that will run tests on conversion 
of Datasets to Arrow payloads for supported types.  The suite will generate a 
Dataset and matching Arrow JSON data, then the dataset is converted to an Arrow 
payload and finally validated against the JSON data.  This will ensure that the 
schema and data has been converted correctly.

Added PySpark tests to verify the `toPandas` method is producing equal 
DataFrames with and without pyarrow.  A roundtrip test to ensure the pandas 
DataFrame produced by pyspark is equal to a one made directly with pandas.

Author: Bryan Cutler 
Author: Li Jin 
Author: Li Jin 
Author: Wes McKinney 

Closes #18459 from BryanCutler/toPandas_with_arrow-SPARK-13534.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d03aebbe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d03aebbe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d03aebbe

Branch: refs/heads/master
Commit: d03aebbe6508ba441dc87f9546f27aeb27553d77
Parents: 2bfd5ac
Author: Bryan Cutler 
Authored: Mon Jul 10 15:21:03 2017 -0700
Committer: Holden Karau 
Committed: Mon Jul 10 15:21:03 2017 -0700

--
 bin/pyspark |2 +-
 dev/deps/spark-deps-hadoop-2.6  |5 +
 dev/deps/spark-deps-hadoop-2.7  |5 +
 pom.xml |   20 +
 python/pyspark/serializers.py   |   17 +
 python/pyspark/sql/dataframe.py |   48 +-
 python/pyspark/sql/tests.py |   78 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |   22 +
 sql/core/pom.xml|4 +
 .../scala/org/apache/spark/sql/Dataset.scala|   20 +
 .../sql/execution/arrow/ArrowConverters.scala   |  429 ++
 .../execution/arrow/ArrowConvertersSuite.scala  | 1222 ++
 12 files changed, 1859 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d03aebbe/bin/pyspark
--
diff --git a/bin/pyspark b/bin/pyspark
index d3b512e..dd28627 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -68,7 +68,7 @@ if [[ -n "$SPARK_TESTING" ]]; then
   unset YARN_CONF_DIR
   unset HADOOP_CONF_DIR
   export PYTHONHASHSEED=0
-  exec "$PYSPARK_DRIVER_PYTHON" -m "$1"
+  exec "$PYSPARK_DRIVER_PYTHON" -m "$@"
   exit
 fi
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d03aebbe/dev/deps/spark-deps-hadoop-2.6
--
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index c132531..1a6515b 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -13,6 +13,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
 api-asn1-api-1.0.0-M20.jar
 api-util-1.0.0-M20.jar
 arpack_combined_all-0.1.jar
+arrow-format-0.4.0.jar
+arrow-memory-0.4.0.jar
+arrow-vector-0.4.0.jar
 avro-1.7.7.jar
 avro-ipc-1.7.7.jar
 avro-mapred-1.7.7-hadoop2.jar
@@ -55,6 +58,7 @@ datanucleus-core-3.2.10.jar
 datanucleus-rdbms-3.2.9.jar
 derby-10.12.1.1.jar
 eigenbase-properties-1.1.5.jar
+flatbuffers-1.2.0-3f79e055.jar
 gson-2.2.4.jar
 guava-14.0.1.jar
 guice-3.0.jar
@@ -77,6 +81,7 @@ hadoop-yarn-server-web-proxy-2.6.5.jar
 hk2-api-2.4.0-b34.jar
 hk2-locator-2.4.0-b34.jar
 

[spark] Git Push Summary

2017-07-10 Thread marmbrus
Repository: spark
Updated Tags:  refs/tags/v2.1.0-rc4 [deleted] ec3172658

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] Git Push Summary

2017-07-10 Thread marmbrus
Repository: spark
Updated Tags:  refs/tags/v2.1.0-rc5 [deleted] cd0a08361

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] Git Push Summary

2017-07-10 Thread marmbrus
Repository: spark
Updated Tags:  refs/tags/v2.1.0-rc3 [deleted] ef2ccf942

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] Git Push Summary

2017-07-10 Thread marmbrus
Repository: spark
Updated Tags:  refs/tags/v2.1.0-rc1 [deleted] 80aabc0bd

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] Git Push Summary

2017-07-10 Thread marmbrus
Repository: spark
Updated Tags:  refs/tags/v2.1.0-rc2 [deleted] 080717497

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] Git Push Summary

2017-07-10 Thread marmbrus
Repository: spark
Updated Tags:  refs/tags/v2.2.0-rc4 [deleted] 377cfa8ac

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] Git Push Summary

2017-07-10 Thread marmbrus
Repository: spark
Updated Tags:  refs/tags/v2.2.0-rc1 [deleted] 8ccb4a57c

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] Git Push Summary

2017-07-10 Thread marmbrus
Repository: spark
Updated Tags:  refs/tags/v2.2.0-rc2 [deleted] 1d4017b44

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] Git Push Summary

2017-07-10 Thread marmbrus
Repository: spark
Updated Tags:  refs/tags/v2.2.0-rc6 [deleted] a2c7b2133

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] Git Push Summary

2017-07-10 Thread marmbrus
Repository: spark
Updated Tags:  refs/tags/v2.2.0-rc5 [deleted] 62e442e73

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] Git Push Summary

2017-07-10 Thread marmbrus
Repository: spark
Updated Tags:  refs/tags/v2.2.0-rc3 [deleted] cc5dbd55b

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] Git Push Summary

2017-07-10 Thread marmbrus
Repository: spark
Updated Tags:  refs/tags/v2.2.0 [created] a2c7b2133

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r20396 - /dev/spark/spark-2.2.0-rc6/ /release/spark/spark-2.2.0/

2017-07-10 Thread marmbrus
Author: marmbrus
Date: Mon Jul 10 22:11:42 2017
New Revision: 20396

Log:
Release Spark 2.2.0

Added:
release/spark/spark-2.2.0/
  - copied from r20395, dev/spark/spark-2.2.0-rc6/
Removed:
dev/spark/spark-2.2.0-rc6/


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r20394 - /dev/spark/spark-2.2.0-rc6/

2017-07-10 Thread marmbrus
Author: marmbrus
Date: Mon Jul 10 19:25:36 2017
New Revision: 20394

Log:
Add spark-2.2.0-rc6

Added:
dev/spark/spark-2.2.0-rc6/
dev/spark/spark-2.2.0-rc6/SparkR_2.2.0.tar.gz   (with props)
dev/spark/spark-2.2.0-rc6/SparkR_2.2.0.tar.gz.asc
dev/spark/spark-2.2.0-rc6/SparkR_2.2.0.tar.gz.md5
dev/spark/spark-2.2.0-rc6/SparkR_2.2.0.tar.gz.sha
dev/spark/spark-2.2.0-rc6/pyspark-2.2.0.tar.gz   (with props)
dev/spark/spark-2.2.0-rc6/pyspark-2.2.0.tar.gz.asc
dev/spark/spark-2.2.0-rc6/pyspark-2.2.0.tar.gz.md5
dev/spark/spark-2.2.0-rc6/pyspark-2.2.0.tar.gz.sha
dev/spark/spark-2.2.0-rc6/spark-2.2.0-bin-hadoop2.6.tgz   (with props)
dev/spark/spark-2.2.0-rc6/spark-2.2.0-bin-hadoop2.6.tgz.asc
dev/spark/spark-2.2.0-rc6/spark-2.2.0-bin-hadoop2.6.tgz.md5
dev/spark/spark-2.2.0-rc6/spark-2.2.0-bin-hadoop2.6.tgz.sha
dev/spark/spark-2.2.0-rc6/spark-2.2.0-bin-hadoop2.7.tgz   (with props)
dev/spark/spark-2.2.0-rc6/spark-2.2.0-bin-hadoop2.7.tgz.asc
dev/spark/spark-2.2.0-rc6/spark-2.2.0-bin-hadoop2.7.tgz.md5
dev/spark/spark-2.2.0-rc6/spark-2.2.0-bin-hadoop2.7.tgz.sha
dev/spark/spark-2.2.0-rc6/spark-2.2.0-bin-without-hadoop.tgz   (with props)
dev/spark/spark-2.2.0-rc6/spark-2.2.0-bin-without-hadoop.tgz.asc
dev/spark/spark-2.2.0-rc6/spark-2.2.0-bin-without-hadoop.tgz.md5
dev/spark/spark-2.2.0-rc6/spark-2.2.0-bin-without-hadoop.tgz.sha
dev/spark/spark-2.2.0-rc6/spark-2.2.0.tgz   (with props)
dev/spark/spark-2.2.0-rc6/spark-2.2.0.tgz.asc
dev/spark/spark-2.2.0-rc6/spark-2.2.0.tgz.md5
dev/spark/spark-2.2.0-rc6/spark-2.2.0.tgz.sha

Added: dev/spark/spark-2.2.0-rc6/SparkR_2.2.0.tar.gz
==
Binary file - no diff available.

Propchange: dev/spark/spark-2.2.0-rc6/SparkR_2.2.0.tar.gz
--
svn:mime-type = application/octet-stream

Added: dev/spark/spark-2.2.0-rc6/SparkR_2.2.0.tar.gz.asc
==
--- dev/spark/spark-2.2.0-rc6/SparkR_2.2.0.tar.gz.asc (added)
+++ dev/spark/spark-2.2.0-rc6/SparkR_2.2.0.tar.gz.asc Mon Jul 10 19:25:36 2017
@@ -0,0 +1,11 @@
+-BEGIN PGP SIGNATURE-
+Version: GnuPG v2.0.14 (GNU/Linux)
+
+iQEcBAABAgAGBQJZVtv9AAoJEHxsEF/8jtCJo4cIAJ3OOFwUo3gw6SmPkwZyOTd4
+Tzs53q2jNpozxocw+6vgyYzQEs8YIXTJqybV6wWh9l5pp3/fACQBhe1PzC74lcnW
+KS5nnzwG2Rzex6IyRynrv1No0vGkkQGvGHAvb+JwD94kPFliod2bkXNhuGRNttL2
+j60BoW4Aq9jWT/G3li2/7O+OzdZ7hC1GCiWPEWUrNJLWwyg8krnjbKTYFsEpWU4E
+S5HeWvG+HYWncCdMY4LLkUSywkcNfgdwQW+regbt8tgISkBndIrmhsT01qL7rn/l
+nKdUFm3CNQPfBuZgKXx6HnpgKHD142pGkrEKcbFRn1IDpfuN8iH4Qe+1HR8NsTM=
+=V2Zc
+-END PGP SIGNATURE-

Added: dev/spark/spark-2.2.0-rc6/SparkR_2.2.0.tar.gz.md5
==
--- dev/spark/spark-2.2.0-rc6/SparkR_2.2.0.tar.gz.md5 (added)
+++ dev/spark/spark-2.2.0-rc6/SparkR_2.2.0.tar.gz.md5 Mon Jul 10 19:25:36 2017
@@ -0,0 +1 @@
+SparkR_2.2.0.tar.gz: 82 00 CE F7 80 D0 50 BD  9B F9 6D AE F4 87 04 0C

Added: dev/spark/spark-2.2.0-rc6/SparkR_2.2.0.tar.gz.sha
==
--- dev/spark/spark-2.2.0-rc6/SparkR_2.2.0.tar.gz.sha (added)
+++ dev/spark/spark-2.2.0-rc6/SparkR_2.2.0.tar.gz.sha Mon Jul 10 19:25:36 2017
@@ -0,0 +1,3 @@
+SparkR_2.2.0.tar.gz: 9E34CE50 380CAF13 30477DED 1A502499 C6BC983A 2D27B825
+ 3402CECE C553D808 2B3C8A27 E25ED02A 44EA3653 A22D1F26
+ 38C672EB AC59527D 4E1B2DB9 CB3FB8C0

Added: dev/spark/spark-2.2.0-rc6/pyspark-2.2.0.tar.gz
==
Binary file - no diff available.

Propchange: dev/spark/spark-2.2.0-rc6/pyspark-2.2.0.tar.gz
--
svn:mime-type = application/octet-stream

Added: dev/spark/spark-2.2.0-rc6/pyspark-2.2.0.tar.gz.asc
==
--- dev/spark/spark-2.2.0-rc6/pyspark-2.2.0.tar.gz.asc (added)
+++ dev/spark/spark-2.2.0-rc6/pyspark-2.2.0.tar.gz.asc Mon Jul 10 19:25:36 2017
@@ -0,0 +1,11 @@
+-BEGIN PGP SIGNATURE-
+Version: GnuPG v2.0.14 (GNU/Linux)
+
+iQEcBAABAgAGBQJZVtopAAoJEHxsEF/8jtCJXcsIALgsHAhv/BtEJNNWRIam/HPX
+VQWqty1spVn2DScdLhALWTGg7RO2L3xi4f5hbCbkKyo8gqTmXPYCnkUs1U/7B4Bf
+3HtEoGrBzdh+kcNV2CJvRDvp2aTFPDOtFAgNCqMhjB77TYsXCHbBqWO2Yp2Ybi2m
+73JroR6WpehLy4UE95M7JEtv8GcfkYKkkTXfejJmKzK9YW5phw+ZP0H0qG/HWaYX
+H3e4qpjS+iDu7vCo3vv/H2wwml9WdjAiqNMj405SjHAEW32lEygrsEeezzlwreGg
+gKoSjhDU8XBPJPaTMSv56g5bTpfSsrGtES6OjIN20B7mXpoI9KqRCg++t2KzDCI=
+=1iIJ
+-END PGP SIGNATURE-

Added: dev/spark/spark-2.2.0-rc6/pyspark-2.2.0.tar.gz.md5
==
--- dev/spark/spark-2.2.0-rc6/pyspark-2.2.0.tar.gz.md5 (added)
+++ 

spark git commit: [SPARK-21266][R][PYTHON] Support schema a DDL-formatted string in dapply/gapply/from_json

2017-07-10 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/master 18b3b00ec -> 2bfd5accd


[SPARK-21266][R][PYTHON] Support schema a DDL-formatted string in 
dapply/gapply/from_json

## What changes were proposed in this pull request?

This PR supports schema in a DDL formatted string for `from_json` in R/Python 
and `dapply` and `gapply` in R, which are commonly used and/or consistent with 
Scala APIs.

Additionally, this PR exposes `structType` in R to allow working around in 
other possible corner cases.

**Python**

`from_json`

```python
from pyspark.sql.functions import from_json

data = [(1, '''{"a": 1}''')]
df = spark.createDataFrame(data, ("key", "value"))
df.select(from_json(df.value, "a INT").alias("json")).show()
```

**R**

`from_json`

```R
df <- sql("SELECT named_struct('name', 'Bob') as people")
df <- mutate(df, people_json = to_json(df$people))
head(select(df, from_json(df$people_json, "name STRING")))
```

`structType.character`

```R
structType("a STRING, b INT")
```

`dapply`

```R
dapply(createDataFrame(list(list(1.0)), "a"), function(x) {x}, "a DOUBLE")
```

`gapply`

```R
gapply(createDataFrame(list(list(1.0)), "a"), "a", function(key, x) { x }, "a 
DOUBLE")
```

## How was this patch tested?

Doc tests for `from_json` in Python and unit tests `test_sparkSQL.R` in R.

Author: hyukjinkwon 

Closes #18498 from HyukjinKwon/SPARK-21266.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bfd5acc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bfd5acc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bfd5acc

Branch: refs/heads/master
Commit: 2bfd5accdce2ae31feeeddf213a019cf8ec97663
Parents: 18b3b00
Author: hyukjinkwon 
Authored: Mon Jul 10 10:40:03 2017 -0700
Committer: Felix Cheung 
Committed: Mon Jul 10 10:40:03 2017 -0700

--
 R/pkg/NAMESPACE |   2 +
 R/pkg/R/DataFrame.R |  36 -
 R/pkg/R/functions.R |  12 +-
 R/pkg/R/group.R |   3 +
 R/pkg/R/schema.R|  29 +++-
 R/pkg/tests/fulltests/test_sparkSQL.R   | 136 +++
 python/pyspark/sql/functions.py |  11 +-
 .../scala/org/apache/spark/sql/functions.scala  |   7 +-
 8 files changed, 160 insertions(+), 76 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2bfd5acc/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index b7fdae5..232f5cf 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -429,6 +429,7 @@ export("structField",
"structField.character",
"print.structField",
"structType",
+   "structType.character",
"structType.jobj",
"structType.structField",
"print.structType")
@@ -465,5 +466,6 @@ S3method(print, summary.GBTRegressionModel)
 S3method(print, summary.GBTClassificationModel)
 S3method(structField, character)
 S3method(structField, jobj)
+S3method(structType, character)
 S3method(structType, jobj)
 S3method(structType, structField)

http://git-wip-us.apache.org/repos/asf/spark/blob/2bfd5acc/R/pkg/R/DataFrame.R
--
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 3b9d42d..e7a166c 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1391,6 +1391,10 @@ setMethod("summarize",
   })
 
 dapplyInternal <- function(x, func, schema) {
+  if (is.character(schema)) {
+schema <- structType(schema)
+  }
+
   packageNamesArr <- serialize(.sparkREnv[[".packages"]],
connection = NULL)
 
@@ -1408,6 +1412,8 @@ dapplyInternal <- function(x, func, schema) {
   dataFrame(sdf)
 }
 
+setClassUnion("characterOrstructType", c("character", "structType"))
+
 #' dapply
 #'
 #' Apply a function to each partition of a SparkDataFrame.
@@ -1418,10 +1424,11 @@ dapplyInternal <- function(x, func, schema) {
 #' to each partition will be passed.
 #' The output of func should be a R data.frame.
 #' @param schema The schema of the resulting SparkDataFrame after the function 
is applied.
-#'   It must match the output of func.
+#'   It must match the output of func. Since Spark 2.3, the 
DDL-formatted string
+#'   is also supported for the schema.
 #' @family SparkDataFrame functions
 #' @rdname dapply
-#' @aliases dapply,SparkDataFrame,function,structType-method
+#' @aliases dapply,SparkDataFrame,function,characterOrstructType-method
 #' @name dapply
 #' @seealso \link{dapplyCollect}
 #' @export
@@ -1444,6 +1451,17 @@ dapplyInternal <- function(x, func, schema) {

spark git commit: [SPARK-21272] SortMergeJoin LeftAnti does not update numOutputRows

2017-07-10 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 40fd0ce7f -> a05edf454


[SPARK-21272] SortMergeJoin LeftAnti does not update numOutputRows

## What changes were proposed in this pull request?

Updating numOutputRows metric was missing from one return path of LeftAnti 
SortMergeJoin.

## How was this patch tested?

Non-zero output rows manually seen in metrics.

Author: Juliusz Sompolski 

Closes #18494 from juliuszsompolski/SPARK-21272.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a05edf45
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a05edf45
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a05edf45

Branch: refs/heads/branch-2.2
Commit: a05edf454a67261c89f0f2ecd1fe46bb8cebc257
Parents: 40fd0ce
Author: Juliusz Sompolski 
Authored: Mon Jul 10 09:26:42 2017 -0700
Committer: gatorsmile 
Committed: Mon Jul 10 09:30:55 2017 -0700

--
 .../spark/sql/execution/joins/SortMergeJoinExec.scala   |  1 +
 .../spark/sql/execution/metric/SQLMetricsSuite.scala| 12 
 2 files changed, 13 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a05edf45/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 26fb610..a772015 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -290,6 +290,7 @@ case class SortMergeJoinExec(
 currentLeftRow = smjScanner.getStreamedRow
 val currentRightMatches = smjScanner.getBufferedMatches
 if (currentRightMatches == null || currentRightMatches.length 
== 0) {
+  numOutputRows += 1
   return true
 }
 var found = false

http://git-wip-us.apache.org/repos/asf/spark/blob/a05edf45/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index e5442455..79d1fbf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -288,6 +288,18 @@ class SQLMetricsSuite extends SparkFunSuite with 
SharedSQLContext {
 }
   }
 
+  test("SortMergeJoin(left-anti) metrics") {
+val anti = testData2.filter("a > 2")
+withTempView("antiData") {
+  anti.createOrReplaceTempView("antiData")
+  val df = spark.sql(
+"SELECT * FROM testData2 ANTI JOIN antiData ON testData2.a = 
antiData.a")
+  testSparkPlanMetrics(df, 1, Map(
+0L -> ("SortMergeJoin", Map("number of output rows" -> 4L)))
+  )
+}
+  }
+
   test("save metrics") {
 withTempPath { file =>
   val previousExecutionIds = 
spark.sharedState.listener.executionIdToData.keySet


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21272] SortMergeJoin LeftAnti does not update numOutputRows

2017-07-10 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 6a06c4b03 -> 18b3b00ec


[SPARK-21272] SortMergeJoin LeftAnti does not update numOutputRows

## What changes were proposed in this pull request?

Updating numOutputRows metric was missing from one return path of LeftAnti 
SortMergeJoin.

## How was this patch tested?

Non-zero output rows manually seen in metrics.

Author: Juliusz Sompolski 

Closes #18494 from juliuszsompolski/SPARK-21272.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18b3b00e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18b3b00e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18b3b00e

Branch: refs/heads/master
Commit: 18b3b00ecfde6c694fb6fee4f4d07d04e3d08ccf
Parents: 6a06c4b
Author: Juliusz Sompolski 
Authored: Mon Jul 10 09:26:42 2017 -0700
Committer: gatorsmile 
Committed: Mon Jul 10 09:26:42 2017 -0700

--
 .../spark/sql/execution/joins/SortMergeJoinExec.scala   |  1 +
 .../spark/sql/execution/metric/SQLMetricsSuite.scala| 12 
 2 files changed, 13 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/18b3b00e/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 8445c26..639b8e0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -290,6 +290,7 @@ case class SortMergeJoinExec(
 currentLeftRow = smjScanner.getStreamedRow
 val currentRightMatches = smjScanner.getBufferedMatches
 if (currentRightMatches == null || currentRightMatches.length 
== 0) {
+  numOutputRows += 1
   return true
 }
 var found = false

http://git-wip-us.apache.org/repos/asf/spark/blob/18b3b00e/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index cb3405b..2911cbb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -483,6 +483,18 @@ class SQLMetricsSuite extends SparkFunSuite with 
SharedSQLContext {
 }
   }
 
+  test("SortMergeJoin(left-anti) metrics") {
+val anti = testData2.filter("a > 2")
+withTempView("antiData") {
+  anti.createOrReplaceTempView("antiData")
+  val df = spark.sql(
+"SELECT * FROM testData2 ANTI JOIN antiData ON testData2.a = 
antiData.a")
+  testSparkPlanMetrics(df, 1, Map(
+0L -> ("SortMergeJoin", Map("number of output rows" -> 4L)))
+  )
+}
+  }
+
   test("save metrics") {
 withTempPath { file =>
   // person creates a temporary view. get the DF before listing previous 
execution IDs


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21342] Fix DownloadCallback to work well with RetryingBlockFetcher.

2017-07-10 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 3bfad9d42 -> 40fd0ce7f


[SPARK-21342] Fix DownloadCallback to work well with RetryingBlockFetcher.

When `RetryingBlockFetcher` retries fetching blocks. There could be two 
`DownloadCallback`s download the same content to the same target file. It could 
cause `ShuffleBlockFetcherIterator` reading a partial result.

This pr proposes to create and delete the tmp files in `OneForOneBlockFetcher`

Author: jinxing 
Author: Shixiong Zhu 

Closes #18565 from jinxing64/SPARK-21342.

(cherry picked from commit 6a06c4b03c4dd86241fb9d11b4360371488f0e53)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40fd0ce7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40fd0ce7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40fd0ce7

Branch: refs/heads/branch-2.2
Commit: 40fd0ce7f2c2facb96fc5d613bc7b6e4b573d9f7
Parents: 3bfad9d
Author: jinxing 
Authored: Mon Jul 10 21:06:58 2017 +0800
Committer: Wenchen Fan 
Committed: Mon Jul 10 21:10:02 2017 +0800

--
 .../network/shuffle/ExternalShuffleClient.java  |  7 ++--
 .../network/shuffle/OneForOneBlockFetcher.java  | 34 +++---
 .../spark/network/shuffle/ShuffleClient.java| 13 +--
 .../network/shuffle/TempShuffleFileManager.java | 36 
 .../network/sasl/SaslIntegrationSuite.java  |  2 +-
 .../shuffle/OneForOneBlockFetcherSuite.java |  2 +-
 .../spark/network/BlockTransferService.scala|  8 ++---
 .../netty/NettyBlockTransferService.scala   |  9 +++--
 .../storage/ShuffleBlockFetcherIterator.scala   | 28 ++-
 .../spark/storage/BlockManagerSuite.scala   |  5 ++-
 .../ShuffleBlockFetcherIteratorSuite.scala  | 10 +++---
 11 files changed, 108 insertions(+), 46 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/40fd0ce7/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
--
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 269fa72..39af9d5 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.network.shuffle;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -88,15 +87,15 @@ public class ExternalShuffleClient extends ShuffleClient {
   String execId,
   String[] blockIds,
   BlockFetchingListener listener,
-  File[] shuffleFiles) {
+  TempShuffleFileManager tempShuffleFileManager) {
 checkInit();
 logger.debug("External shuffle fetch from {}:{} (executor id {})", host, 
port, execId);
 try {
   RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
   (blockIds1, listener1) -> {
 TransportClient client = clientFactory.createClient(host, port);
-new OneForOneBlockFetcher(client, appId, execId, blockIds1, 
listener1, conf,
-  shuffleFiles).start();
+new OneForOneBlockFetcher(client, appId, execId,
+  blockIds1, listener1, conf, tempShuffleFileManager).start();
   };
 
   int maxRetries = conf.maxIORetries();

http://git-wip-us.apache.org/repos/asf/spark/blob/40fd0ce7/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
--
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
index d46ce2e..2f160d1 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
@@ -57,30 +57,36 @@ public class OneForOneBlockFetcher {
   private final String[] blockIds;
   private final BlockFetchingListener listener;
   private final ChunkReceivedCallback chunkCallback;
-  private TransportConf transportConf = null;
-  private File[] shuffleFiles = null;
+  private final TransportConf transportConf;
+  private final TempShuffleFileManager tempShuffleFileManager;
 
   private StreamHandle streamHandle = null;
 
   public 

spark git commit: [SPARK-21342] Fix DownloadCallback to work well with RetryingBlockFetcher.

2017-07-10 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 647963a26 -> 6a06c4b03


[SPARK-21342] Fix DownloadCallback to work well with RetryingBlockFetcher.

## What changes were proposed in this pull request?

When `RetryingBlockFetcher` retries fetching blocks. There could be two 
`DownloadCallback`s download the same content to the same target file. It could 
cause `ShuffleBlockFetcherIterator` reading a partial result.

This pr proposes to create and delete the tmp files in `OneForOneBlockFetcher`

Author: jinxing 
Author: Shixiong Zhu 

Closes #18565 from jinxing64/SPARK-21342.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a06c4b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a06c4b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a06c4b0

Branch: refs/heads/master
Commit: 6a06c4b03c4dd86241fb9d11b4360371488f0e53
Parents: 647963a
Author: jinxing 
Authored: Mon Jul 10 21:06:58 2017 +0800
Committer: Wenchen Fan 
Committed: Mon Jul 10 21:06:58 2017 +0800

--
 .../network/shuffle/ExternalShuffleClient.java  |  7 ++--
 .../network/shuffle/OneForOneBlockFetcher.java  | 34 +++---
 .../spark/network/shuffle/ShuffleClient.java| 13 +--
 .../network/shuffle/TempShuffleFileManager.java | 36 
 .../network/sasl/SaslIntegrationSuite.java  |  2 +-
 .../shuffle/OneForOneBlockFetcherSuite.java |  2 +-
 .../spark/network/BlockTransferService.scala|  8 ++---
 .../netty/NettyBlockTransferService.scala   |  9 +++--
 .../storage/ShuffleBlockFetcherIterator.scala   | 28 ++-
 .../spark/storage/BlockManagerSuite.scala   |  4 +--
 .../ShuffleBlockFetcherIteratorSuite.scala  | 10 +++---
 11 files changed, 108 insertions(+), 45 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6a06c4b0/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
--
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 6ac9302..31bd24e 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.network.shuffle;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -91,15 +90,15 @@ public class ExternalShuffleClient extends ShuffleClient {
   String execId,
   String[] blockIds,
   BlockFetchingListener listener,
-  File[] shuffleFiles) {
+  TempShuffleFileManager tempShuffleFileManager) {
 checkInit();
 logger.debug("External shuffle fetch from {}:{} (executor id {})", host, 
port, execId);
 try {
   RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
   (blockIds1, listener1) -> {
 TransportClient client = clientFactory.createClient(host, port);
-new OneForOneBlockFetcher(client, appId, execId, blockIds1, 
listener1, conf,
-  shuffleFiles).start();
+new OneForOneBlockFetcher(client, appId, execId,
+  blockIds1, listener1, conf, tempShuffleFileManager).start();
   };
 
   int maxRetries = conf.maxIORetries();

http://git-wip-us.apache.org/repos/asf/spark/blob/6a06c4b0/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
--
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
index d46ce2e..2f160d1 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
@@ -57,30 +57,36 @@ public class OneForOneBlockFetcher {
   private final String[] blockIds;
   private final BlockFetchingListener listener;
   private final ChunkReceivedCallback chunkCallback;
-  private TransportConf transportConf = null;
-  private File[] shuffleFiles = null;
+  private final TransportConf transportConf;
+  private final TempShuffleFileManager tempShuffleFileManager;
 
   private StreamHandle streamHandle = null;
 
   public OneForOneBlockFetcher(
+TransportClient client,
+String appId,
+String execId,
+

spark git commit: [SPARK-20460][SQL] Make it more consistent to handle column name duplication

2017-07-10 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master c444d1086 -> 647963a26


[SPARK-20460][SQL] Make it more consistent to handle column name duplication

## What changes were proposed in this pull request?
This pr made it more consistent to handle column name duplication. In the 
current master, error handling is different when hitting column name 
duplication:
```
// json
scala> val schema = StructType(StructField("a", IntegerType) :: 
StructField("a", IntegerType) :: Nil)
scala> Seq("""{"a":1, 
"a":1}"").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("json").schema(schema).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: 
a#12, a#13.;
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:181)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:153)

scala> spark.read.format("json").load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Duplicate column(s) : "a" found, cannot 
save to JSON format;
  at 
org.apache.spark.sql.execution.datasources.json.JsonDataSource.checkConstraints(JsonDataSource.scala:81)
  at 
org.apache.spark.sql.execution.datasources.json.JsonDataSource.inferSchema(JsonDataSource.scala:63)
  at 
org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:57)
  at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:176)
  at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:176)

// csv
scala> val schema = StructType(StructField("a", IntegerType) :: 
StructField("a", IntegerType) :: Nil)
scala> Seq("a,a", 
"1,1").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("csv").schema(schema).option("header", 
false).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: 
a#41, a#42.;
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:181)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:153)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:152)

// If `inferSchema` is true, a CSV format is duplicate-safe (See SPARK-16896)
scala> spark.read.format("csv").option("header", true).load("/tmp/data").show
+---+---+
| a0| a1|
+---+---+
|  1|  1|
+---+---+

// parquet
scala> val schema = StructType(StructField("a", IntegerType) :: 
StructField("a", IntegerType) :: Nil)
scala> Seq((1, 1)).toDF("a", 
"b").coalesce(1).write.mode("overwrite").parquet("/tmp/data")
scala> spark.read.format("parquet").schema(schema).option("header", 
false).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: 
a#110, a#111.;
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:181)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:153)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:152)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
```
When this patch applied, the results change to;
```

// json
scala> val schema = StructType(StructField("a", IntegerType) :: 
StructField("a", IntegerType) :: Nil)
scala> Seq("""{"a":1, 
"a":1}"").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("json").schema(schema).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in 
datasource: "a";
  at 
org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47)
  at 
org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33)
  at 
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186)
  at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368)

scala> spark.read.format("json").load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in 
datasource: "a";
  at 
org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47)
  at 
org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33)
  at 
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186)
  at 

spark git commit: [MINOR][DOC] Remove obsolete `ec2-scripts.md`

2017-07-10 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 96d58f285 -> c444d1086


[MINOR][DOC] Remove obsolete `ec2-scripts.md`

## What changes were proposed in this pull request?

Since this document became obsolete, we had better remove this for Apache Spark 
2.3.0. The original document is removed via SPARK-12735 on January 2016, and 
currently it's just redirection page. The only reference in Apache Spark 
website will go directly to the destination in 
https://github.com/apache/spark-website/pull/54.

## How was this patch tested?

N/A. This is a removal of documentation.

Author: Dongjoon Hyun 

Closes #18578 from dongjoon-hyun/SPARK-REMOVE-EC2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c444d108
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c444d108
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c444d108

Branch: refs/heads/master
Commit: c444d10868c808f4ae43becd5506bf944d9c2e9b
Parents: 96d58f2
Author: Dongjoon Hyun 
Authored: Mon Jul 10 07:46:47 2017 +0100
Committer: Sean Owen 
Committed: Mon Jul 10 07:46:47 2017 +0100

--
 docs/ec2-scripts.md | 7 ---
 1 file changed, 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c444d108/docs/ec2-scripts.md
--
diff --git a/docs/ec2-scripts.md b/docs/ec2-scripts.md
deleted file mode 100644
index 6cd39db..000
--- a/docs/ec2-scripts.md
+++ /dev/null
@@ -1,7 +0,0 @@

-layout: global
-title: Running Spark on EC2
-redirect: https://github.com/amplab/spark-ec2#readme

-
-This document has been superseded and replaced by documentation at 
https://github.com/amplab/spark-ec2#readme


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark-website] Git Push Summary

2017-07-10 Thread srowen
Repository: spark-website
Updated Branches:
  refs/heads/remove_ec2 [deleted] 04d5ce051

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21219][CORE] Task retry occurs on same executor due to race condition with blacklisting

2017-07-10 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 0e80ecae3 -> 96d58f285


[SPARK-21219][CORE] Task retry occurs on same executor due to race condition 
with blacklisting

## What changes were proposed in this pull request?

There's a race condition in the current TaskSetManager where a failed task is 
added for retry (addPendingTask), and can asynchronously be assigned to an 
executor *prior* to the blacklist state (updateBlacklistForFailedTask), the 
result is the task might re-execute on the same executor.  This is particularly 
problematic if the executor is shutting down since the retry task immediately 
becomes a lost task (ExecutorLostFailure).  Another side effect is that the 
actual failure reason gets obscured by the retry task which never actually 
executed.  There are sample logs showing the issue in the 
https://issues.apache.org/jira/browse/SPARK-21219

The fix is to change the ordering of the addPendingTask and 
updatingBlackListForFailedTask calls in TaskSetManager.handleFailedTask

## How was this patch tested?

Implemented a unit test that verifies the task is black listed before it is 
added to the pending task.  Ran the unit test without the fix and it fails.  
Ran the unit test with the fix and it passes.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Eric Vandenberg 

Closes #18427 from ericvandenbergfb/blacklistFix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96d58f28
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96d58f28
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96d58f28

Branch: refs/heads/master
Commit: 96d58f285bc98d4c2484150eefe7447db4784a86
Parents: 0e80eca
Author: Eric Vandenberg 
Authored: Mon Jul 10 14:40:20 2017 +0800
Committer: Wenchen Fan 
Committed: Mon Jul 10 14:40:20 2017 +0800

--
 .../apache/spark/scheduler/TaskSetManager.scala | 21 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   | 44 +++-
 2 files changed, 54 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/96d58f28/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 02d374d..3968fb7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -198,7 +198,7 @@ private[spark] class TaskSetManager(
   private[scheduler] var emittedTaskSizeWarning = false
 
   /** Add a task to all the pending-task lists that it should be on. */
-  private def addPendingTask(index: Int) {
+  private[spark] def addPendingTask(index: Int) {
 for (loc <- tasks(index).preferredLocations) {
   loc match {
 case e: ExecutorCacheTaskLocation =>
@@ -832,15 +832,6 @@ private[spark] class TaskSetManager(
 
 sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, 
info)
 
-if (successful(index)) {
-  logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, but 
the task will not" +
-s" be re-executed (either because the task failed with a shuffle data 
fetch failure," +
-s" so the previous stage needs to be re-run, or because a different 
copy of the task" +
-s" has already succeeded).")
-} else {
-  addPendingTask(index)
-}
-
 if (!isZombie && reason.countTowardsTaskFailures) {
   taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
 info.host, info.executorId, index))
@@ -854,6 +845,16 @@ private[spark] class TaskSetManager(
 return
   }
 }
+
+if (successful(index)) {
+  logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, but 
the task will not" +
+s" be re-executed (either because the task failed with a shuffle data 
fetch failure," +
+s" so the previous stage needs to be re-run, or because a different 
copy of the task" +
+s" has already succeeded).")
+} else {
+  addPendingTask(index)
+}
+
 maybeFinishTaskSet()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/96d58f28/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 80fb674..e46900e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ 

[1/2] spark-website git commit: Recover ec2-scripts.html and remove ec2-scripts.md.

2017-07-10 Thread srowen
Repository: spark-website
Updated Branches:
  refs/heads/remove_ec2 [created] 04d5ce051


Recover ec2-scripts.html and remove ec2-scripts.md.


Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/74622a5c
Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/74622a5c
Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/74622a5c

Branch: refs/heads/remove_ec2
Commit: 74622a5cd3c41c1fa6d8ea336ac003e29502b216
Parents: 878dcfd
Author: Dongjoon Hyun 
Authored: Sun Jul 9 01:46:09 2017 -0700
Committer: Dongjoon Hyun 
Committed: Sun Jul 9 02:24:55 2017 -0700

--
 faq.md   |   2 +-
 site/docs/2.1.1/ec2-scripts.html | 161 ++
 site/docs/2.1.1/ec2-scripts.md   |   7 --
 site/faq.html|   2 +-
 4 files changed, 163 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark-website/blob/74622a5c/faq.md
--
diff --git a/faq.md b/faq.md
index 614664c..c41de64 100644
--- a/faq.md
+++ b/faq.md
@@ -26,7 +26,7 @@ Spark is a fast and general processing engine compatible with 
Hadoop data. It ca
 No. Spark's operators spill data to disk if it does not fit 
in memory, allowing it to run well on any sized data. Likewise, cached datasets 
that do not fit in memory are either spilled to disk or recomputed on the fly 
when needed, as determined by the RDD's storage
 level.
 
 How can I run Spark on a cluster?
-You can use either the standalone deploy 
mode, which only needs Java to be installed on each node, or the Mesos and YARN cluster 
managers. If you'd like to run on Amazon EC2, Spark provides EC2 scripts to 
automatically launch a cluster.
+You can use either the standalone deploy 
mode, which only needs Java to be installed on each node, or the Mesos and YARN cluster 
managers. If you'd like to run on Amazon EC2, Spark provides EC2 scripts to 
automatically launch a cluster.
 
 Note that you can also run Spark locally (possibly on multiple cores) 
without any special setup by just passing local[N] as the master 
URL, where N is the number of parallel threads you want.
 

http://git-wip-us.apache.org/repos/asf/spark-website/blob/74622a5c/site/docs/2.1.1/ec2-scripts.html
--
diff --git a/site/docs/2.1.1/ec2-scripts.html b/site/docs/2.1.1/ec2-scripts.html
new file mode 100644
index 000..320317f
--- /dev/null
+++ b/site/docs/2.1.1/ec2-scripts.html
@@ -0,0 +1,161 @@
+
+
+
+
+
+  
+
+
+
+Running Spark on EC2 - Spark 2.1.1 Documentation
+
+
+
+  https://github.com/amplab/spark-ec2#readme;>
+  https://github.com/amplab/spark-ec2#readme; />
+
+
+
+
+body {
+padding-top: 60px;
+padding-bottom: 40px;
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+  2.1.1
+
+
+
+Overview
+
+
+Programming Guides
+
+Quick 
Start
+Spark 
Programming Guide
+
+Spark Streaming
+DataFrames, Datasets and SQL
+Structured Streaming
+MLlib (Machine 
Learning)
+GraphX (Graph Processing)
+SparkR (R on 
Spark)
+
+
+
+
+API Docs
+
+Scala
+Java
+Python
+R
+
+
+
+
+Deploying
+
+Overview
+Submitting Applications
+
+Spark 
Standalone
+Mesos
+YARN
+
+
+
+
+More
+
+Configuration
+   

spark-website git commit: Use AMPLab direct link in FAQ

2017-07-10 Thread srowen
Repository: spark-website
Updated Branches:
  refs/heads/asf-site 74622a5cd -> 04d5ce051


Use AMPLab direct link in FAQ


Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/04d5ce05
Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/04d5ce05
Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/04d5ce05

Branch: refs/heads/asf-site
Commit: 04d5ce05125b4c2de50dd74765a1ca0c64e72752
Parents: 74622a5
Author: Dongjoon Hyun 
Authored: Sun Jul 9 04:00:40 2017 -0700
Committer: Dongjoon Hyun 
Committed: Sun Jul 9 04:00:40 2017 -0700

--
 faq.md   |   2 +-
 site/docs/2.1.1/ec2-scripts.html | 161 --
 site/faq.html|   2 +-
 3 files changed, 2 insertions(+), 163 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark-website/blob/04d5ce05/faq.md
--
diff --git a/faq.md b/faq.md
index c41de64..f1974c7 100644
--- a/faq.md
+++ b/faq.md
@@ -26,7 +26,7 @@ Spark is a fast and general processing engine compatible with 
Hadoop data. It ca
 No. Spark's operators spill data to disk if it does not fit 
in memory, allowing it to run well on any sized data. Likewise, cached datasets 
that do not fit in memory are either spilled to disk or recomputed on the fly 
when needed, as determined by the RDD's storage
 level.
 
 How can I run Spark on a cluster?
-You can use either the standalone deploy 
mode, which only needs Java to be installed on each node, or the Mesos and YARN cluster 
managers. If you'd like to run on Amazon EC2, Spark provides EC2 scripts to 
automatically launch a cluster.
+You can use either the standalone deploy 
mode, which only needs Java to be installed on each node, or the Mesos and YARN cluster 
managers. If you'd like to run on Amazon EC2, AMPLab provides https://github.com/amplab/spark-ec2#readme;>EC2 scripts to 
automatically launch a cluster.
 
 Note that you can also run Spark locally (possibly on multiple cores) 
without any special setup by just passing local[N] as the master 
URL, where N is the number of parallel threads you want.
 

http://git-wip-us.apache.org/repos/asf/spark-website/blob/04d5ce05/site/docs/2.1.1/ec2-scripts.html
--
diff --git a/site/docs/2.1.1/ec2-scripts.html b/site/docs/2.1.1/ec2-scripts.html
deleted file mode 100644
index 320317f..000
--- a/site/docs/2.1.1/ec2-scripts.html
+++ /dev/null
@@ -1,161 +0,0 @@
-
-
-
-
-
-  
-
-
-
-Running Spark on EC2 - Spark 2.1.1 Documentation
-
-
-
-  https://github.com/amplab/spark-ec2#readme;>
-  https://github.com/amplab/spark-ec2#readme; />
-
-
-
-
-body {
-padding-top: 60px;
-padding-bottom: 40px;
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-  2.1.1
-
-
-
-Overview
-
-
-Programming Guides
-
-Quick 
Start
-Spark 
Programming Guide
-
-Spark Streaming
-DataFrames, Datasets and SQL
-Structured Streaming
-MLlib (Machine 
Learning)
-GraphX (Graph Processing)
-SparkR (R on 
Spark)
-
-
-
-
-API Docs
-
-Scala
-Java
-Python
-R
-
-
-
-
-Deploying
-
-Overview
-Submitting Applications
-
-Spark 
Standalone
-Mesos
-YARN
-
-
-
-
-More
-
-Configuration
-

[2/2] spark-website git commit: Use AMPLab direct link in FAQ

2017-07-10 Thread srowen
Use AMPLab direct link in FAQ


Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/04d5ce05
Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/04d5ce05
Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/04d5ce05

Branch: refs/heads/remove_ec2
Commit: 04d5ce05125b4c2de50dd74765a1ca0c64e72752
Parents: 74622a5
Author: Dongjoon Hyun 
Authored: Sun Jul 9 04:00:40 2017 -0700
Committer: Dongjoon Hyun 
Committed: Sun Jul 9 04:00:40 2017 -0700

--
 faq.md   |   2 +-
 site/docs/2.1.1/ec2-scripts.html | 161 --
 site/faq.html|   2 +-
 3 files changed, 2 insertions(+), 163 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark-website/blob/04d5ce05/faq.md
--
diff --git a/faq.md b/faq.md
index c41de64..f1974c7 100644
--- a/faq.md
+++ b/faq.md
@@ -26,7 +26,7 @@ Spark is a fast and general processing engine compatible with 
Hadoop data. It ca
 No. Spark's operators spill data to disk if it does not fit 
in memory, allowing it to run well on any sized data. Likewise, cached datasets 
that do not fit in memory are either spilled to disk or recomputed on the fly 
when needed, as determined by the RDD's storage
 level.
 
 How can I run Spark on a cluster?
-You can use either the standalone deploy 
mode, which only needs Java to be installed on each node, or the Mesos and YARN cluster 
managers. If you'd like to run on Amazon EC2, Spark provides EC2 scripts to 
automatically launch a cluster.
+You can use either the standalone deploy 
mode, which only needs Java to be installed on each node, or the Mesos and YARN cluster 
managers. If you'd like to run on Amazon EC2, AMPLab provides https://github.com/amplab/spark-ec2#readme;>EC2 scripts to 
automatically launch a cluster.
 
 Note that you can also run Spark locally (possibly on multiple cores) 
without any special setup by just passing local[N] as the master 
URL, where N is the number of parallel threads you want.
 

http://git-wip-us.apache.org/repos/asf/spark-website/blob/04d5ce05/site/docs/2.1.1/ec2-scripts.html
--
diff --git a/site/docs/2.1.1/ec2-scripts.html b/site/docs/2.1.1/ec2-scripts.html
deleted file mode 100644
index 320317f..000
--- a/site/docs/2.1.1/ec2-scripts.html
+++ /dev/null
@@ -1,161 +0,0 @@
-
-
-
-
-
-  
-
-
-
-Running Spark on EC2 - Spark 2.1.1 Documentation
-
-
-
-  https://github.com/amplab/spark-ec2#readme;>
-  https://github.com/amplab/spark-ec2#readme; />
-
-
-
-
-body {
-padding-top: 60px;
-padding-bottom: 40px;
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-  2.1.1
-
-
-
-Overview
-
-
-Programming Guides
-
-Quick 
Start
-Spark 
Programming Guide
-
-Spark Streaming
-DataFrames, Datasets and SQL
-Structured Streaming
-MLlib (Machine 
Learning)
-GraphX (Graph Processing)
-SparkR (R on 
Spark)
-
-
-
-
-API Docs
-
-Scala
-Java
-Python
-R
-
-
-
-
-Deploying
-
-Overview
-Submitting Applications
-
-Spark 
Standalone
-Mesos
-YARN
-
-
-
-
-More
-
-Configuration
-Monitoring
-Tuning Guide
-

spark-website git commit: Recover ec2-scripts.html and remove ec2-scripts.md.

2017-07-10 Thread srowen
Repository: spark-website
Updated Branches:
  refs/heads/asf-site 878dcfd84 -> 74622a5cd


Recover ec2-scripts.html and remove ec2-scripts.md.


Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/74622a5c
Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/74622a5c
Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/74622a5c

Branch: refs/heads/asf-site
Commit: 74622a5cd3c41c1fa6d8ea336ac003e29502b216
Parents: 878dcfd
Author: Dongjoon Hyun 
Authored: Sun Jul 9 01:46:09 2017 -0700
Committer: Dongjoon Hyun 
Committed: Sun Jul 9 02:24:55 2017 -0700

--
 faq.md   |   2 +-
 site/docs/2.1.1/ec2-scripts.html | 161 ++
 site/docs/2.1.1/ec2-scripts.md   |   7 --
 site/faq.html|   2 +-
 4 files changed, 163 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark-website/blob/74622a5c/faq.md
--
diff --git a/faq.md b/faq.md
index 614664c..c41de64 100644
--- a/faq.md
+++ b/faq.md
@@ -26,7 +26,7 @@ Spark is a fast and general processing engine compatible with 
Hadoop data. It ca
 No. Spark's operators spill data to disk if it does not fit 
in memory, allowing it to run well on any sized data. Likewise, cached datasets 
that do not fit in memory are either spilled to disk or recomputed on the fly 
when needed, as determined by the RDD's storage
 level.
 
 How can I run Spark on a cluster?
-You can use either the standalone deploy 
mode, which only needs Java to be installed on each node, or the Mesos and YARN cluster 
managers. If you'd like to run on Amazon EC2, Spark provides EC2 scripts to 
automatically launch a cluster.
+You can use either the standalone deploy 
mode, which only needs Java to be installed on each node, or the Mesos and YARN cluster 
managers. If you'd like to run on Amazon EC2, Spark provides EC2 scripts to 
automatically launch a cluster.
 
 Note that you can also run Spark locally (possibly on multiple cores) 
without any special setup by just passing local[N] as the master 
URL, where N is the number of parallel threads you want.
 

http://git-wip-us.apache.org/repos/asf/spark-website/blob/74622a5c/site/docs/2.1.1/ec2-scripts.html
--
diff --git a/site/docs/2.1.1/ec2-scripts.html b/site/docs/2.1.1/ec2-scripts.html
new file mode 100644
index 000..320317f
--- /dev/null
+++ b/site/docs/2.1.1/ec2-scripts.html
@@ -0,0 +1,161 @@
+
+
+
+
+
+  
+
+
+
+Running Spark on EC2 - Spark 2.1.1 Documentation
+
+
+
+  https://github.com/amplab/spark-ec2#readme;>
+  https://github.com/amplab/spark-ec2#readme; />
+
+
+
+
+body {
+padding-top: 60px;
+padding-bottom: 40px;
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+  2.1.1
+
+
+
+Overview
+
+
+Programming Guides
+
+Quick 
Start
+Spark 
Programming Guide
+
+Spark Streaming
+DataFrames, Datasets and SQL
+Structured Streaming
+MLlib (Machine 
Learning)
+GraphX (Graph Processing)
+SparkR (R on 
Spark)
+
+
+
+
+API Docs
+
+Scala
+Java
+Python
+R
+
+
+
+
+Deploying
+
+Overview
+Submitting Applications
+
+Spark 
Standalone
+Mesos
+YARN
+
+
+
+
+More
+
+Configuration
+