spark git commit: [SPARK-9517][SQL] BytesToBytesMap should encode data the same way as UnsafeExternalSorter

2015-07-31 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 67ad4e21f -> d90f2cf7a


[SPARK-9517][SQL] BytesToBytesMap should encode data the same way as 
UnsafeExternalSorter

BytesToBytesMap current encodes key/value data in the following format:
```
8B key length, key data, 8B value length, value data
```

UnsafeExternalSorter, on the other hand, encodes data this way:
```
4B record length, data
```

As a result, we cannot pass records encoded by BytesToBytesMap directly into 
UnsafeExternalSorter for sorting. However, if we rearrange data slightly, we 
can then pass the key/value records directly into UnsafeExternalSorter:
```
4B key+value length, 4B key length, key data, value data
```

Author: Reynold Xin 

Closes #7845 from rxin/kvsort-rebase and squashes the following commits:

5716b59 [Reynold Xin] Fixed test.
2e62ccb [Reynold Xin] Updated BytesToBytesMap's data encoding to put the key 
first.
a51b641 [Reynold Xin] Added a KV sorter interface.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d90f2cf7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d90f2cf7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d90f2cf7

Branch: refs/heads/master
Commit: d90f2cf7a2a1d1e69f9ab385f35f62d4091b5302
Parents: 67ad4e2
Author: Reynold Xin 
Authored: Fri Jul 31 23:55:16 2015 -0700
Committer: Reynold Xin 
Committed: Fri Jul 31 23:55:16 2015 -0700

--
 .../spark/unsafe/map/BytesToBytesMap.java   | 58 
 .../unsafe/sort/UnsafeExternalSorter.java   | 15 
 .../map/AbstractBytesToBytesMapSuite.java   |  6 +-
 .../sql/execution/UnsafeKeyValueSorter.java | 30 
 .../UnsafeFixedWidthAggregationMap.java | 73 ++--
 .../sql/execution/GeneratedAggregate.scala  | 27 +---
 .../UnsafeFixedWidthAggregationMapSuite.scala   | 27 
 .../org/apache/spark/unsafe/KVIterator.java | 29 
 8 files changed, 175 insertions(+), 90 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d90f2cf7/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
--
diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 0f42950..481375f 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.unsafe.map;
 
-import java.io.IOException;
 import java.lang.Override;
 import java.lang.UnsupportedOperationException;
 import java.util.Iterator;
@@ -212,7 +211,7 @@ public final class BytesToBytesMap {
*/
   public int numElements() { return numElements; }
 
-  private static final class BytesToBytesMapIterator implements 
Iterator {
+  public static final class BytesToBytesMapIterator implements 
Iterator {
 
 private final int numRecords;
 private final Iterator dataPagesIterator;
@@ -222,7 +221,8 @@ public final class BytesToBytesMap {
 private Object pageBaseObject;
 private long offsetInPage;
 
-BytesToBytesMapIterator(int numRecords, Iterator 
dataPagesIterator, Location loc) {
+private BytesToBytesMapIterator(
+int numRecords, Iterator dataPagesIterator, Location loc) 
{
   this.numRecords = numRecords;
   this.dataPagesIterator = dataPagesIterator;
   this.loc = loc;
@@ -244,13 +244,13 @@ public final class BytesToBytesMap {
 
 @Override
 public Location next() {
-  int keyLength = (int) PlatformDependent.UNSAFE.getLong(pageBaseObject, 
offsetInPage);
-  if (keyLength == END_OF_PAGE_MARKER) {
+  int totalLength = PlatformDependent.UNSAFE.getInt(pageBaseObject, 
offsetInPage);
+  if (totalLength == END_OF_PAGE_MARKER) {
 advanceToNextPage();
-keyLength = (int) PlatformDependent.UNSAFE.getLong(pageBaseObject, 
offsetInPage);
+totalLength = PlatformDependent.UNSAFE.getInt(pageBaseObject, 
offsetInPage);
   }
   loc.with(pageBaseObject, offsetInPage);
-  offsetInPage += 8 + 8 + keyLength + loc.getValueLength();
+  offsetInPage += 8 + totalLength;
   currentRecordNumber++;
   return loc;
 }
@@ -269,7 +269,7 @@ public final class BytesToBytesMap {
* If any other lookups or operations are performed on this map while 
iterating over it, including
* `lookup()`, the behavior of the returned iterator is undefined.
*/
-  public Iterator iterator() {
+  public BytesToBytesMapIterator iterator() {
 return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc);
   }
 
@@ -352,15 +352,18 @@ public final class BytesToBytesMap {
 taskMemoryManager.getOffsetInPage(fullKeyAddress

spark git commit: [SPARK-8232] [SQL] Add sort_array support

2015-07-31 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 3320b0ba2 -> 67ad4e21f


[SPARK-8232] [SQL] Add sort_array support

Add expression `sort_array` support.

Author: Cheng Hao 

This patch had conflicts when merged, resolved by
Committer: Davies Liu 

Closes #7581 from chenghao-intel/sort_array and squashes the following commits:

664c960 [Cheng Hao] update the sort_array by using the ArrayData
276d2d5 [Cheng Hao] add empty line
0edab9c [Cheng Hao] Add asending/descending support for sort_array
80fc0f8 [Cheng Hao] Add type checking
a42b678 [Cheng Hao] Add sort_array support


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67ad4e21
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67ad4e21
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67ad4e21

Branch: refs/heads/master
Commit: 67ad4e21fc68336b0ad6f9a363fb5ebb51f592bf
Parents: 3320b0b
Author: Cheng Hao 
Authored: Fri Jul 31 23:11:22 2015 -0700
Committer: Davies Liu 
Committed: Fri Jul 31 23:11:22 2015 -0700

--
 python/pyspark/sql/functions.py | 20 +
 .../catalyst/analysis/FunctionRegistry.scala|  1 +
 .../expressions/collectionOperations.scala  | 80 +++-
 .../expressions/CollectionFunctionsSuite.scala  | 22 ++
 .../scala/org/apache/spark/sql/functions.scala  | 19 -
 .../spark/sql/DataFrameFunctionsSuite.scala | 51 -
 6 files changed, 186 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/67ad4e21/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 89a2a5c..fb542e6 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -51,6 +51,7 @@ __all__ = [
 'sha1',
 'sha2',
 'size',
+'sort_array',
 'sparkPartitionId',
 'struct',
 'udf',
@@ -570,8 +571,10 @@ def length(col):
 def format_number(col, d):
 """Formats the number X to a format like '#,###,###.##', rounded to d 
decimal places,
and returns the result as a string.
+
 :param col: the column name of the numeric value to be formatted
 :param d: the N decimal places
+
 >>> sqlContext.createDataFrame([(5,)], ['a']).select(format_number('a', 
4).alias('v')).collect()
 [Row(v=u'5.')]
 """
@@ -968,6 +971,23 @@ def soundex(col):
 return Column(sc._jvm.functions.size(_to_java_column(col)))
 
 
+@since(1.5)
+def sort_array(col, asc=True):
+"""
+Collection function: sorts the input array for the given column in 
ascending order.
+
+:param col: name of column or expression
+
+>>> df = sqlContext.createDataFrame([([2, 1, 3],),([1],),([],)], ['data'])
+>>> df.select(sort_array(df.data).alias('r')).collect()
+[Row(r=[1, 2, 3]), Row(r=[1]), Row(r=[])]
+>>> df.select(sort_array(df.data, asc=False).alias('r')).collect()
+[Row(r=[3, 2, 1]), Row(r=[1]), Row(r=[])]
+ """
+sc = SparkContext._active_spark_context
+return Column(sc._jvm.functions.sort_array(_to_java_column(col), asc))
+
+
 class UserDefinedFunction(object):
 """
 User defined function in Python

http://git-wip-us.apache.org/repos/asf/spark/blob/67ad4e21/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index ee44cbc..6e14451 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -233,6 +233,7 @@ object FunctionRegistry {
 
 // collection functions
 expression[Size]("size"),
+expression[SortArray]("sort_array"),
 
 // misc functions
 expression[Crc32]("crc32"),

http://git-wip-us.apache.org/repos/asf/spark/blob/67ad4e21/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index 1a00dbc..0a53059 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -16,7 +16,10 @@
  */
 package org.apache.spark.sql.catalyst.expres

spark git commit: [SPARK-9415][SQL] Throw AnalysisException when using MapType on Join and Aggregate

2015-07-31 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 14f263448 -> 3320b0ba2


[SPARK-9415][SQL] Throw AnalysisException when using MapType on Join and 
Aggregate

JIRA: https://issues.apache.org/jira/browse/SPARK-9415

Following up #7787. We shouldn't use MapType as grouping keys and join keys too.

Author: Liang-Chi Hsieh 

Closes #7819 from viirya/map_join_groupby and squashes the following commits:

005ee0c [Liang-Chi Hsieh] For comments.
7463398 [Liang-Chi Hsieh] MapType can't be used as join keys, grouping keys.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3320b0ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3320b0ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3320b0ba

Branch: refs/heads/master
Commit: 3320b0ba262159c0c7209ce39b353c93c597077d
Parents: 14f2634
Author: Liang-Chi Hsieh 
Authored: Fri Jul 31 22:26:30 2015 -0700
Committer: Reynold Xin 
Committed: Fri Jul 31 22:26:30 2015 -0700

--
 .../sql/catalyst/analysis/CheckAnalysis.scala   | 16 +++--
 .../catalyst/analysis/AnalysisErrorSuite.scala  | 68 +++-
 .../spark/sql/DataFrameAggregateSuite.scala | 10 ---
 .../scala/org/apache/spark/sql/JoinSuite.scala  |  8 ---
 4 files changed, 77 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3320b0ba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 0ebc3d1..364569d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -92,8 +92,11 @@ trait CheckAnalysis {
   case p: Predicate =>
 
p.asInstanceOf[Expression].children.foreach(checkValidJoinConditionExprs)
   case e if e.dataType.isInstanceOf[BinaryType] =>
-failAnalysis(s"expression ${e.prettyString} in join condition 
" +
-  s"'${condition.prettyString}' can't be binary type.")
+failAnalysis(s"binary type expression ${e.prettyString} cannot 
be used " +
+  "in join conditions")
+  case e if e.dataType.isInstanceOf[MapType] =>
+failAnalysis(s"map type expression ${e.prettyString} cannot be 
used " +
+  "in join conditions")
   case _ => // OK
 }
 
@@ -114,13 +117,16 @@ trait CheckAnalysis {
 
 def checkValidGroupingExprs(expr: Expression): Unit = 
expr.dataType match {
   case BinaryType =>
-failAnalysis(s"grouping expression '${expr.prettyString}' in 
aggregate can " +
-  s"not be binary type.")
+failAnalysis(s"binary type expression ${expr.prettyString} 
cannot be used " +
+  "in grouping expression")
+  case m: MapType =>
+failAnalysis(s"map type expression ${expr.prettyString} cannot 
be used " +
+  "in grouping expression")
   case _ => // OK
 }
 
 aggregateExprs.foreach(checkValidAggregateExpression)
-aggregateExprs.foreach(checkValidGroupingExprs)
+groupingExprs.foreach(checkValidGroupingExprs)
 
   case Sort(orders, _, _) =>
 orders.foreach { order =>

http://git-wip-us.apache.org/repos/asf/spark/blob/3320b0ba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 2588df9..aa19cdc 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -181,7 +181,71 @@ class AnalysisErrorSuite extends SparkFunSuite with 
BeforeAndAfter {
 val error = intercept[AnalysisException] {
   SimpleAnalyzer.checkAnalysis(join)
 }
-error.message.contains("Failure when resolving conflicting references in 
Join")
-error.message.contains("Conflicting attributes")
+assert(error.message.contains("Failure when resolving conflicting 
references in Join"))
+assert(error.message.contains("Conflicting attributes"))
+  }
+
+  test("aggregation can't work on binary and map types") {
+va

spark git commit: [SPARK-9464][SQL] Property checks for UTF8String

2015-07-31 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 6996bd2e8 -> 14f263448


[SPARK-9464][SQL] Property checks for UTF8String

This PR is based on the original work by JoshRosen in #7780, which adds 
ScalaCheck property-based tests for UTF8String.

Author: Josh Rosen 
Author: Yijie Shen 

Closes #7830 from yjshen/utf8-property-checks and squashes the following 
commits:

593da3a [Yijie Shen] resolve comments
c0800e6 [Yijie Shen] Finish all todos in suite
52f51a0 [Josh Rosen] Add some more failing tests
49ed0697 [Josh Rosen] Rename suite
9209c64 [Josh Rosen] UTF8String Property Checks.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14f26344
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14f26344
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14f26344

Branch: refs/heads/master
Commit: 14f263448471f182123fc84619559df90e7ae52c
Parents: 6996bd2
Author: Josh Rosen 
Authored: Fri Jul 31 21:19:23 2015 -0700
Committer: Reynold Xin 
Committed: Fri Jul 31 21:19:23 2015 -0700

--
 unsafe/pom.xml  |  10 +
 .../apache/spark/unsafe/types/UTF8String.java   |  19 +-
 .../spark/unsafe/types/UTF8StringSuite.java |  13 +-
 .../types/UTF8StringPropertyCheckSuite.scala| 249 +++
 4 files changed, 280 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/14f26344/unsafe/pom.xml
--
diff --git a/unsafe/pom.xml b/unsafe/pom.xml
index 33782c6..89475ee 100644
--- a/unsafe/pom.xml
+++ b/unsafe/pom.xml
@@ -70,6 +70,16 @@
   mockito-core
   test
 
+
+  org.scalacheck
+  scalacheck_${scala.binary.version}
+  test
+
+
+  org.apache.commons
+  commons-lang3
+  test
+
   
   
 
target/scala-${scala.binary.version}/classes

http://git-wip-us.apache.org/repos/asf/spark/blob/14f26344/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
--
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java 
b/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
index 2561c1c..f6dafe9 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
@@ -301,10 +301,9 @@ public final class UTF8String implements 
Comparable, Serializable {
 int s = 0;
 int e = this.numBytes - 1;
 // skip all of the space (0x20) in the left side
-while (s < this.numBytes && getByte(s) == 0x20) s++;
+while (s < this.numBytes && getByte(s) <= 0x20 && getByte(s) >= 0x00) s++;
 // skip all of the space (0x20) in the right side
-while (e >= 0 && getByte(e) == 0x20) e--;
-
+while (e >= 0 && getByte(e) <= 0x20 && getByte(e) >= 0x00) e--;
 if (s > e) {
   // empty string
   return UTF8String.fromBytes(new byte[0]);
@@ -316,7 +315,7 @@ public final class UTF8String implements 
Comparable, Serializable {
   public UTF8String trimLeft() {
 int s = 0;
 // skip all of the space (0x20) in the left side
-while (s < this.numBytes && getByte(s) == 0x20) s++;
+while (s < this.numBytes && getByte(s) <= 0x20 && getByte(s) >= 0x00) s++;
 if (s == this.numBytes) {
   // empty string
   return UTF8String.fromBytes(new byte[0]);
@@ -328,7 +327,7 @@ public final class UTF8String implements 
Comparable, Serializable {
   public UTF8String trimRight() {
 int e = numBytes - 1;
 // skip all of the space (0x20) in the right side
-while (e >= 0 && getByte(e) == 0x20) e--;
+while (e >= 0 && getByte(e) <= 0x20 && getByte(e) >= 0x00) e--;
 
 if (e < 0) {
   // empty string
@@ -354,7 +353,7 @@ public final class UTF8String implements 
Comparable, Serializable {
   }
 
   public UTF8String repeat(int times) {
-if (times <=0) {
+if (times <= 0) {
   return EMPTY_UTF8;
 }
 
@@ -492,7 +491,7 @@ public final class UTF8String implements 
Comparable, Serializable {
*/
   public UTF8String rpad(int len, UTF8String pad) {
 int spaces = len - this.numChars(); // number of char need to pad
-if (spaces <= 0) {
+if (spaces <= 0 || pad.numBytes() == 0) {
   // no padding at all, return the substring of the current string
   return substring(0, len);
 } else {
@@ -507,7 +506,7 @@ public final class UTF8String implements 
Comparable, Serializable {
   int idx = 0;
   while (idx < count) {
 copyMemory(pad.base, pad.offset, data, BYTE_ARRAY_OFFSET + offset, 
pad.numBytes);
-++idx;
+++ idx;
 offset += pad.numBytes;
   }
   copyMemory(remain.base, remain.offset, data, BYTE_ARRAY_OFFSET + offset, 
remain.numB

spark git commit: [SPARK-8264][SQL]add substring_index function

2015-07-31 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 03377d252 -> 6996bd2e8


[SPARK-8264][SQL]add substring_index function

This PR is based on #7533 , thanks to zhichao-li

Closes #7533

Author: zhichao.li 
Author: Davies Liu 

Closes #7843 from davies/str_index and squashes the following commits:

391347b [Davies Liu] add python api
3ce7802 [Davies Liu] fix substringIndex
f2d29a1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
str_index
515519b [zhichao.li] add foldable and remove null checking
9546991 [zhichao.li] scala style
67c253a [zhichao.li] hide some apis and clean code
b19b013 [zhichao.li] add codegen and clean code
ac863e9 [zhichao.li] reduce the calling of numChars
12e108f [zhichao.li] refine unittest
d92951b [zhichao.li] add lastIndexOf
52d7b03 [zhichao.li] add substring_index function


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6996bd2e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6996bd2e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6996bd2e

Branch: refs/heads/master
Commit: 6996bd2e81bf6597dcda499d9a9a80927a43e30f
Parents: 03377d2
Author: zhichao.li 
Authored: Fri Jul 31 21:18:01 2015 -0700
Committer: Reynold Xin 
Committed: Fri Jul 31 21:18:01 2015 -0700

--
 python/pyspark/sql/functions.py | 19 +
 .../catalyst/analysis/FunctionRegistry.scala|  1 +
 .../catalyst/expressions/stringOperations.scala | 25 ++
 .../expressions/StringExpressionsSuite.scala| 31 
 .../scala/org/apache/spark/sql/functions.scala  | 12 ++-
 .../apache/spark/sql/StringFunctionsSuite.scala | 57 ++
 .../apache/spark/unsafe/types/UTF8String.java   | 80 +++-
 .../spark/unsafe/types/UTF8StringSuite.java | 38 ++
 8 files changed, 261 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6996bd2e/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index bb9926c..89a2a5c 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -921,6 +921,25 @@ def trunc(date, format):
 
 
 @since(1.5)
+@ignore_unicode_prefix
+def substring_index(str, delim, count):
+"""
+Returns the substring from string str before count occurrences of the 
delimiter delim.
+If count is positive, everything the left of the final delimiter (counting 
from left) is
+returned. If count is negative, every to the right of the final delimiter 
(counting from the
+right) is returned. substring_index performs a case-sensitive match when 
searching for delim.
+
+>>> df = sqlContext.createDataFrame([('a.b.c.d',)], ['s'])
+>>> df.select(substring_index(df.s, '.', 2).alias('s')).collect()
+[Row(s=u'a.b')]
+>>> df.select(substring_index(df.s, '.', -3).alias('s')).collect()
+[Row(s=u'b.c.d')]
+"""
+sc = SparkContext._active_spark_context
+return Column(sc._jvm.functions.substring_index(_to_java_column(str), 
delim, count))
+
+
+@since(1.5)
 def size(col):
 """
 Collection function: returns the length of the array or map stored in the 
column.

http://git-wip-us.apache.org/repos/asf/spark/blob/6996bd2e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 3f61a9a..ee44cbc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -199,6 +199,7 @@ object FunctionRegistry {
 expression[StringSplit]("split"),
 expression[Substring]("substr"),
 expression[Substring]("substring"),
+expression[SubstringIndex]("substring_index"),
 expression[StringTrim]("trim"),
 expression[UnBase64]("unbase64"),
 expression[Upper]("ucase"),

http://git-wip-us.apache.org/repos/asf/spark/blob/6996bd2e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
index 160e72f..5dd387a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/

spark git commit: [SPARK-9358][SQL] Code generation for UnsafeRow joiner.

2015-07-31 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 712f5b7a9 -> 03377d252


[SPARK-9358][SQL] Code generation for UnsafeRow joiner.

This patch creates a code generated unsafe row concatenator that can be used to 
concatenate/join two UnsafeRows into a single UnsafeRow.

Since it is inherently hard to test these low level stuff, the test suites 
employ randomized testing heavily in order to guarantee correctness.

Author: Reynold Xin 

Closes #7821 from rxin/rowconcat and squashes the following commits:

8717f35 [Reynold Xin] Rebase and code review.
72c5d8e [Reynold Xin] Fixed a bug.
a84ed2e [Reynold Xin] Fixed offset.
40c3fb2 [Reynold Xin] Reset random data generator.
f0913aa [Reynold Xin] Test fixes.
6687b6f [Reynold Xin] Updated documentation.
00354b9 [Reynold Xin] Support concat data as well.
e9a4347 [Reynold Xin] Updated.
6269f96 [Reynold Xin] Fixed a bug .
0f89716 [Reynold Xin] [SPARK-9358][SQL][WIP] Code generation for UnsafeRow 
concat.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03377d25
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03377d25
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03377d25

Branch: refs/heads/master
Commit: 03377d2522776267a07b7d6ae9bddf79a4e0f516
Parents: 712f5b7
Author: Reynold Xin 
Authored: Fri Jul 31 21:09:00 2015 -0700
Committer: Reynold Xin 
Committed: Fri Jul 31 21:09:00 2015 -0700

--
 .../sql/catalyst/expressions/UnsafeRow.java |  19 ++
 .../expressions/codegen/CodeGenerator.scala |   2 +
 .../codegen/GenerateUnsafeProjection.scala  |   6 +-
 .../codegen/GenerateUnsafeRowJoiner.scala   | 241 +++
 .../apache/spark/sql/RandomDataGenerator.scala  |  15 +-
 .../GenerateUnsafeRowJoinerBitsetSuite.scala| 147 +++
 .../codegen/GenerateUnsafeRowJoinerSuite.scala  | 114 +
 .../UnsafeFixedWidthAggregationMap.java |   7 +-
 .../spark/sql/execution/TungstenSortSuite.scala |   3 +
 9 files changed, 544 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/03377d25/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
--
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index e7088ed..24dc80b 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -85,6 +85,14 @@ public final class UnsafeRow extends MutableRow {
 })));
   }
 
+  public static boolean isFixedLength(DataType dt) {
+if (dt instanceof DecimalType) {
+  return ((DecimalType) dt).precision() < Decimal.MAX_LONG_DIGITS();
+} else {
+  return settableFieldTypes.contains(dt);
+}
+  }
+
   
//
   // Private fields and methods
   
//
@@ -144,6 +152,17 @@ public final class UnsafeRow extends MutableRow {
 this.sizeInBytes = sizeInBytes;
   }
 
+  /**
+   * Update this UnsafeRow to point to the underlying byte array.
+   *
+   * @param buf byte array to point to
+   * @param numFields the number of fields in this row
+   * @param sizeInBytes the number of bytes valid in the byte array
+   */
+  public void pointTo(byte[] buf, int numFields, int sizeInBytes) {
+pointTo(buf, PlatformDependent.BYTE_ARRAY_OFFSET, numFields, sizeInBytes);
+  }
+
   private void assertIndexIsValid(int index) {
 assert index >= 0 : "index (" + index + ") should >= 0";
 assert index < numFields : "index (" + index + ") should < " + numFields;

http://git-wip-us.apache.org/repos/asf/spark/blob/03377d25/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index e50ec27..36f4e9c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -27,6 +27,7 @@ import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.PlatformDependent
 import org.apache.spark.u

spark git commit: [SPARK-9318] [SPARK-9320] [SPARKR] Aliases for merge and summary functions on DataFrames

2015-07-31 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master 8cb415a4b -> 712f5b7a9


[SPARK-9318] [SPARK-9320] [SPARKR] Aliases for merge and summary functions on 
DataFrames

This PR adds synonyms for ```merge``` and ```summary``` in SparkR DataFrame API.

cc shivaram

Author: Hossein 

Closes #7806 from falaki/SPARK-9320 and squashes the following commits:

72600f7 [Hossein] Updated docs
92a6e75 [Hossein] Fixed merge generic signature issue
4c2b051 [Hossein] Fixing naming with mllib summary
0f3a64c [Hossein] Added ... to generic for merge
30fbaf8 [Hossein] Merged master
ae1a4cf [Hossein] Merge branch 'master' into SPARK-9320
e8eb86f [Hossein] Add a generic for merge
fc01f2d [Hossein] Added unit test
8d92012 [Hossein] Added merge as an alias for join
5b8bedc [Hossein] Added unit test
632693d [Hossein] Added summary as an alias for describe for DataFrame


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/712f5b7a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/712f5b7a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/712f5b7a

Branch: refs/heads/master
Commit: 712f5b7a9ab52c26e3d086629633950ec2fb7afc
Parents: 8cb415a
Author: Hossein 
Authored: Fri Jul 31 19:24:00 2015 -0700
Committer: Shivaram Venkataraman 
Committed: Fri Jul 31 19:24:44 2015 -0700

--
 R/pkg/NAMESPACE  |  2 ++
 R/pkg/R/DataFrame.R  | 22 ++
 R/pkg/R/generics.R   |  8 
 R/pkg/R/mllib.R  |  8 
 R/pkg/inst/tests/test_sparkSQL.R | 14 --
 5 files changed, 48 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/712f5b7a/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index ff116cb..b2d92bd 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -46,6 +46,7 @@ exportMethods("arrange",
   "isLocal",
   "join",
   "limit",
+  "merge",
   "names",
   "ncol",
   "nrow",
@@ -69,6 +70,7 @@ exportMethods("arrange",
   "show",
   "showDF",
   "summarize",
+  "summary",
   "take",
   "unionAll",
   "unique",

http://git-wip-us.apache.org/repos/asf/spark/blob/712f5b7a/R/pkg/R/DataFrame.R
--
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index b4065d2..8956032 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1279,6 +1279,15 @@ setMethod("join",
 dataFrame(sdf)
   })
 
+#' rdname merge
+#' aliases join
+setMethod("merge",
+  signature(x = "DataFrame", y = "DataFrame"),
+  function(x, y, joinExpr = NULL, joinType = NULL, ...) {
+join(x, y, joinExpr, joinType)
+  })
+
+
 #' UnionAll
 #'
 #' Return a new DataFrame containing the union of rows in this DataFrame
@@ -1524,6 +1533,19 @@ setMethod("describe",
 dataFrame(sdf)
   })
 
+#' @title Summary
+#'
+#' @description Computes statistics for numeric columns of the DataFrame
+#'
+#' @rdname summary
+#' @aliases describe
+setMethod("summary",
+  signature(x = "DataFrame"),
+  function(x) {
+describe(x)
+  })
+
+
 #' dropna
 #'
 #' Returns a new DataFrame omitting rows with null values.

http://git-wip-us.apache.org/repos/asf/spark/blob/712f5b7a/R/pkg/R/generics.R
--
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 71d1e34..c43b947 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -461,6 +461,10 @@ setGeneric("isLocal", function(x) { 
standardGeneric("isLocal") })
 #' @export
 setGeneric("limit", function(x, num) {standardGeneric("limit") })
 
+#' rdname merge
+#' @export
+setGeneric("merge")
+
 #' @rdname withColumn
 #' @export
 setGeneric("mutate", function(x, ...) {standardGeneric("mutate") })
@@ -531,6 +535,10 @@ setGeneric("showDF", function(x,...) { 
standardGeneric("showDF") })
 #' @export
 setGeneric("summarize", function(x,...) { standardGeneric("summarize") })
 
+##' rdname summary
+##' @export
+setGeneric("summary", function(x, ...) { standardGeneric("summary") })
+
 # @rdname tojson
 # @export
 setGeneric("toJSON", function(x) { standardGeneric("toJSON") })

http://git-wip-us.apache.org/repos/asf/spark/blob/712f5b7a/R/pkg/R/mllib.R
--
diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R
index efddcc1..b524d1f 100644
--- a/R/pkg/R/mllib.R
+++ b/R/pkg/R/mllib.R
@@ -86,12 +86,12 @@ setMethod("predict", signature(object = "PipelineModel"),
 #' model <- glm(y ~ x, train

[2/2] spark git commit: [SPARK-9451] [SQL] Support entries larger than default page size in BytesToBytesMap & integrate with ShuffleMemoryManager

2015-07-31 Thread joshrosen
[SPARK-9451] [SQL] Support entries larger than default page size in 
BytesToBytesMap & integrate with ShuffleMemoryManager

This patch adds support for entries larger than the default page size in 
BytesToBytesMap.  These large rows are handled by allocating special overflow 
pages to hold individual entries.

In addition, this patch integrates BytesToBytesMap with the 
ShuffleMemoryManager:

- Move BytesToBytesMap from `unsafe` to `core` so that it can import 
`ShuffleMemoryManager`.
- Before allocating new data pages, ask the ShuffleMemoryManager to reserve the 
memory:
  - `putNewKey()` now returns a boolean to indicate whether the insert 
succeeded or failed due to a lack of memory.  The caller can use this value to 
respond to the memory pressure (e.g. by spilling).
- `UnsafeFixedWidthAggregationMap. getAggregationBuffer()` now returns `null` 
to signal failure due to a lack of memory.
- Updated all uses of these classes to handle these error conditions.
- Added new tests for allocating large records and for allocations which fail 
due to memory pressure.
- Extended the `afterAll()` test teardown methods to detect 
ShuffleMemoryManager leaks.

Author: Josh Rosen 

Closes #7762 from JoshRosen/large-rows and squashes the following commits:

ae7bc56 [Josh Rosen] Fix compilation
82fc657 [Josh Rosen] Merge remote-tracking branch 'origin/master' into 
large-rows
34ab943 [Josh Rosen] Remove semi
31a525a [Josh Rosen] Integrate BytesToBytesMap with ShuffleMemoryManager.
626b33c [Josh Rosen] Move code to sql/core and spark/core packages so that 
ShuffleMemoryManager can be integrated
ec4484c [Josh Rosen] Move BytesToBytesMap from unsafe package to core.
642ed69 [Josh Rosen] Rename size to numElements
bea1152 [Josh Rosen] Add basic test.
2cd3570 [Josh Rosen] Remove accidental duplicated code
07ff9ef [Josh Rosen] Basic support for large rows in BytesToBytesMap.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8cb415a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8cb415a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8cb415a4

Branch: refs/heads/master
Commit: 8cb415a4b9bc1f82127ccce4a5579d433f4e8f83
Parents: f51fd6f
Author: Josh Rosen 
Authored: Fri Jul 31 19:19:27 2015 -0700
Committer: Josh Rosen 
Committed: Fri Jul 31 19:19:27 2015 -0700

--
 .../spark/unsafe/map/BytesToBytesMap.java   | 709 +++
 .../spark/unsafe/map/HashMapGrowthStrategy.java |  41 ++
 .../spark/shuffle/ShuffleMemoryManager.scala|   8 +-
 .../map/AbstractBytesToBytesMapSuite.java   | 499 +
 .../unsafe/map/BytesToBytesMapOffHeapSuite.java |  29 +
 .../unsafe/map/BytesToBytesMapOnHeapSuite.java  |  29 +
 .../UnsafeFixedWidthAggregationMap.java | 223 --
 .../UnsafeFixedWidthAggregationMapSuite.scala   | 132 
 .../UnsafeFixedWidthAggregationMap.java | 234 ++
 .../sql/execution/GeneratedAggregate.scala  |   6 +
 .../sql/execution/joins/HashedRelation.scala|  27 +-
 .../UnsafeFixedWidthAggregationMapSuite.scala   | 140 
 .../spark/unsafe/map/BytesToBytesMap.java   | 643 -
 .../spark/unsafe/map/HashMapGrowthStrategy.java |  41 --
 .../apache/spark/unsafe/memory/MemoryBlock.java |   2 +-
 .../spark/unsafe/memory/TaskMemoryManager.java  |   1 +
 .../map/AbstractBytesToBytesMapSuite.java   | 385 --
 .../unsafe/map/BytesToBytesMapOffHeapSuite.java |  29 -
 .../unsafe/map/BytesToBytesMapOnHeapSuite.java  |  29 -
 19 files changed, 1717 insertions(+), 1490 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8cb415a4/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
--
diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
new file mode 100644
index 000..0f42950
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -0,0 +1,709 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permission

[1/2] spark git commit: [SPARK-9451] [SQL] Support entries larger than default page size in BytesToBytesMap & integrate with ShuffleMemoryManager

2015-07-31 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master f51fd6fbb -> 8cb415a4b


http://git-wip-us.apache.org/repos/asf/spark/blob/8cb415a4/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
new file mode 100644
index 000..79fd52d
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.shuffle.ShuffleMemoryManager
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, 
TaskMemoryManager}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+class UnsafeFixedWidthAggregationMapSuite
+  extends SparkFunSuite
+  with Matchers
+  with BeforeAndAfterEach {
+
+  import UnsafeFixedWidthAggregationMap._
+
+  private val groupKeySchema = StructType(StructField("product", StringType) 
:: Nil)
+  private val aggBufferSchema = StructType(StructField("salePrice", 
IntegerType) :: Nil)
+  private def emptyAggregationBuffer: InternalRow = InternalRow(0)
+  private val PAGE_SIZE_BYTES: Long = 1L << 26; // 64 megabytes
+
+  private var taskMemoryManager: TaskMemoryManager = null
+  private var shuffleMemoryManager: ShuffleMemoryManager = null
+
+  override def beforeEach(): Unit = {
+taskMemoryManager = new TaskMemoryManager(new 
ExecutorMemoryManager(MemoryAllocator.HEAP))
+shuffleMemoryManager = new ShuffleMemoryManager(Long.MaxValue)
+  }
+
+  override def afterEach(): Unit = {
+if (taskMemoryManager != null) {
+  val leakedShuffleMemory = 
shuffleMemoryManager.getMemoryConsumptionForThisTask
+  assert(taskMemoryManager.cleanUpAllAllocatedMemory() === 0)
+  assert(leakedShuffleMemory === 0)
+  taskMemoryManager = null
+}
+  }
+
+  test("supported schemas") {
+assert(supportsAggregationBufferSchema(
+  StructType(StructField("x", DecimalType.USER_DEFAULT) :: Nil)))
+assert(!supportsAggregationBufferSchema(
+  StructType(StructField("x", DecimalType.SYSTEM_DEFAULT) :: Nil)))
+assert(!supportsAggregationBufferSchema(StructType(StructField("x", 
StringType) :: Nil)))
+assert(
+  !supportsAggregationBufferSchema(StructType(StructField("x", 
ArrayType(IntegerType)) :: Nil)))
+  }
+
+  test("empty map") {
+val map = new UnsafeFixedWidthAggregationMap(
+  emptyAggregationBuffer,
+  aggBufferSchema,
+  groupKeySchema,
+  taskMemoryManager,
+  shuffleMemoryManager,
+  1024, // initial capacity,
+  PAGE_SIZE_BYTES,
+  false // disable perf metrics
+)
+assert(!map.iterator().hasNext)
+map.free()
+  }
+
+  test("updating values for a single key") {
+val map = new UnsafeFixedWidthAggregationMap(
+  emptyAggregationBuffer,
+  aggBufferSchema,
+  groupKeySchema,
+  taskMemoryManager,
+  shuffleMemoryManager,
+  1024, // initial capacity
+  PAGE_SIZE_BYTES,
+  false // disable perf metrics
+)
+val groupKey = InternalRow(UTF8String.fromString("cats"))
+
+// Looking up a key stores a zero-entry in the map (like Python Counters 
or DefaultDicts)
+assert(map.getAggregationBuffer(groupKey) != null)
+val iter = map.iterator()
+val entry = iter.next()
+assert(!iter.hasNext)
+entry.key.getString(0) should be ("cats")
+entry.value.getInt(0) should be (0)
+
+// Modifications to rows retrieved from the map should update the values 
in the map
+entry.value.setInt(0, 42)
+map.getAggregationBuffer(groupKey).getInt(0) should be (42)
+
+map.free()
+  }
+
+  test("inserting large random keys") {
+val map = new Un

spark git commit: [SPARK-8936] [MLLIB] OnlineLDA document-topic Dirichlet hyperparameter optimization

2015-07-31 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 4d5a6e7b6 -> f51fd6fbb


[SPARK-8936] [MLLIB] OnlineLDA document-topic Dirichlet hyperparameter 
optimization

Adds `alpha` (document-topic Dirichlet parameter) hyperparameter optimization 
to `OnlineLDAOptimizer` following Huang: Maximum Likelihood Estimation of 
Dirichlet Distribution Parameters. Also introduces a private 
`setSampleWithReplacement` to `OnlineLDAOptimizer` for unit testing purposes.

Author: Feynman Liang 

Closes #7836 from feynmanliang/SPARK-8936-alpha-optimize and squashes the 
following commits:

4bef484 [Feynman Liang] Documentation improvements
c3c6c1d [Feynman Liang] Fix docs
151e859 [Feynman Liang] Fix style
fa77518 [Feynman Liang] Hyperparameter optimization


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f51fd6fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f51fd6fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f51fd6fb

Branch: refs/heads/master
Commit: f51fd6fbb4d9822502f98b312251e317d757bc3a
Parents: 4d5a6e7
Author: Feynman Liang 
Authored: Fri Jul 31 18:36:22 2015 -0700
Committer: Joseph K. Bradley 
Committed: Fri Jul 31 18:36:22 2015 -0700

--
 .../spark/mllib/clustering/LDAOptimizer.scala   | 75 +---
 .../spark/mllib/clustering/LDASuite.scala   | 34 +
 2 files changed, 99 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f51fd6fb/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
index d6f8b29..b0e14cb 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
@@ -19,8 +19,8 @@ package org.apache.spark.mllib.clustering
 
 import java.util.Random
 
-import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, normalize, sum}
-import breeze.numerics.{abs, exp}
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, all, normalize, 
sum}
+import breeze.numerics.{trigamma, abs, exp}
 import breeze.stats.distributions.{Gamma, RandBasis}
 
 import org.apache.spark.annotation.DeveloperApi
@@ -239,22 +239,26 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   /** alias for docConcentration */
   private var alpha: Vector = Vectors.dense(0)
 
-  /** (private[clustering] for debugging)  Get docConcentration */
+  /** (for debugging)  Get docConcentration */
   private[clustering] def getAlpha: Vector = alpha
 
   /** alias for topicConcentration */
   private var eta: Double = 0
 
-  /** (private[clustering] for debugging)  Get topicConcentration */
+  /** (for debugging)  Get topicConcentration */
   private[clustering] def getEta: Double = eta
 
   private var randomGenerator: java.util.Random = null
 
+  /** (for debugging) Whether to sample mini-batches with replacement. 
(default = true) */
+  private var sampleWithReplacement: Boolean = true
+
   // Online LDA specific parameters
   // Learning rate is: (tau0 + t)^{-kappa}
   private var tau0: Double = 1024
   private var kappa: Double = 0.51
   private var miniBatchFraction: Double = 0.05
+  private var optimizeAlpha: Boolean = false
 
   // internal data structure
   private var docs: RDD[(Long, Vector)] = null
@@ -262,7 +266,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   /** Dirichlet parameter for the posterior over topics */
   private var lambda: BDM[Double] = null
 
-  /** (private[clustering] for debugging) Get parameter for topics */
+  /** (for debugging) Get parameter for topics */
   private[clustering] def getLambda: BDM[Double] = lambda
 
   /** Current iteration (count of invocations of [[next()]]) */
@@ -325,7 +329,22 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   }
 
   /**
-   * (private[clustering])
+   * Optimize alpha, indicates whether alpha (Dirichlet parameter for 
document-topic distribution)
+   * will be optimized during training.
+   */
+  def getOptimzeAlpha: Boolean = this.optimizeAlpha
+
+  /**
+   * Sets whether to optimize alpha parameter during training.
+   *
+   * Default: false
+   */
+  def setOptimzeAlpha(optimizeAlpha: Boolean): this.type = {
+this.optimizeAlpha = optimizeAlpha
+this
+  }
+
+  /**
* Set the Dirichlet parameter for the posterior over topics.
* This is only used for testing now. In the future, it can help support 
training stop/resume.
*/
@@ -335,7 +354,6 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   }
 
   /**
-   * (private[clustering])
* Used for random initialization of the variationa

spark git commit: [SPARK-8271][SQL]string function: soundex

2015-07-31 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 3fc0cb920 -> 4d5a6e7b6


 [SPARK-8271][SQL]string function: soundex

This PR brings SQL function soundex(), see 
https://issues.apache.org/jira/browse/HIVE-9738

It's based on #7115 , thanks to HuJiayin

Author: HuJiayin 
Author: Davies Liu 

Closes #7812 from davies/soundex and squashes the following commits:

fa75941 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
soundex
a4bd6d8 [Davies Liu] fix soundex
2538908 [HuJiayin] add codegen soundex
d15d329 [HuJiayin] add back ut
ded1a14 [HuJiayin] Merge branch 'master' of https://github.com/apache/spark
e2dec2c [HuJiayin] support soundex rebase code


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d5a6e7b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d5a6e7b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d5a6e7b

Branch: refs/heads/master
Commit: 4d5a6e7b60b315968973e22985eb174ec721
Parents: 3fc0cb9
Author: HuJiayin 
Authored: Fri Jul 31 16:05:26 2015 -0700
Committer: Reynold Xin 
Committed: Fri Jul 31 16:05:26 2015 -0700

--
 python/pyspark/sql/functions.py | 17 +++
 .../catalyst/analysis/FunctionRegistry.scala|  1 +
 .../catalyst/expressions/stringOperations.scala | 16 ++
 .../expressions/StringExpressionsSuite.scala| 28 +++
 .../scala/org/apache/spark/sql/functions.scala  |  8 +++
 .../apache/spark/sql/StringFunctionsSuite.scala |  9 
 .../apache/spark/unsafe/types/UTF8String.java   | 53 
 .../spark/unsafe/types/UTF8StringSuite.java | 48 ++
 8 files changed, 180 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4d5a6e7b/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 8024a8d..bb9926c 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -63,6 +63,8 @@ __all__ += [
 'year', 'quarter', 'month', 'hour', 'minute', 'second',
 'dayofmonth', 'dayofyear', 'weekofyear']
 
+__all__ += ['soundex']
+
 
 def _create_function(name, doc=""):
 """ Create a function for aggregator by name"""
@@ -922,6 +924,7 @@ def trunc(date, format):
 def size(col):
 """
 Collection function: returns the length of the array or map stored in the 
column.
+
 :param col: name of column or expression
 
 >>> df = sqlContext.createDataFrame([([1, 2, 3],),([1],),([],)], ['data'])
@@ -932,6 +935,20 @@ def size(col):
 return Column(sc._jvm.functions.size(_to_java_column(col)))
 
 
+@since
+@ignore_unicode_prefix
+def soundex(col):
+"""
+Returns the SoundEx encoding for a string
+
+>>> df = sqlContext.createDataFrame([("Peters",),("Uhrbach",)], ['name'])
+>>> df.select(soundex(df.name).alias("soundex")).collect()
+[Row(soundex=u'P362'), Row(soundex=u'U612')]
+"""
+sc = SparkContext._active_spark_context
+return Column(sc._jvm.functions.size(_to_java_column(col)))
+
+
 class UserDefinedFunction(object):
 """
 User defined function in Python

http://git-wip-us.apache.org/repos/asf/spark/blob/4d5a6e7b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 1bf7204..3f61a9a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -194,6 +194,7 @@ object FunctionRegistry {
 expression[StringRepeat]("repeat"),
 expression[StringReverse]("reverse"),
 expression[StringTrimRight]("rtrim"),
+expression[SoundEx]("soundex"),
 expression[StringSpace]("space"),
 expression[StringSplit]("split"),
 expression[Substring]("substr"),

http://git-wip-us.apache.org/repos/asf/spark/blob/4d5a6e7b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
index 684eac1..160e72f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
@@ -719,6 +719,22 @

spark git commit: [SPARK-9233] [SQL] Enable code-gen in window function unit tests

2015-07-31 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 710c2b5dd -> 3fc0cb920


[SPARK-9233] [SQL] Enable code-gen in window function unit tests

Since code-gen is enabled by default, it is better to run window function tests 
with code-gen.

https://issues.apache.org/jira/browse/SPARK-9233

Author: Yin Huai 

Closes #7832 from yhuai/SPARK-9233 and squashes the following commits:

4e4e4cc [Yin Huai] style
ca80e07 [Yin Huai] Test window function with codegen.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3fc0cb92
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3fc0cb92
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3fc0cb92

Branch: refs/heads/master
Commit: 3fc0cb92001798167a14c1377362a3335397dd4c
Parents: 710c2b5
Author: Yin Huai 
Authored: Fri Jul 31 14:13:06 2015 -0700
Committer: Yin Huai 
Committed: Fri Jul 31 14:13:06 2015 -0700

--
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  9 +++--
 .../HiveWindowFunctionQuerySuite.scala  | 38 +++-
 2 files changed, 12 insertions(+), 35 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3fc0cb92/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 51d910b..f5daba1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -853,8 +853,13 @@ class Analyzer(
   while (i < groupedWindowExpressions.size) {
 val ((partitionSpec, orderSpec), windowExpressions) = 
groupedWindowExpressions(i)
 // Set currentChild to the newly created Window operator.
-currentChild = Window(currentChild.output, windowExpressions,
-  partitionSpec, orderSpec, currentChild)
+currentChild =
+  Window(
+currentChild.output,
+windowExpressions,
+partitionSpec,
+orderSpec,
+currentChild)
 
 // Move to next Window Spec.
 i += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/3fc0cb92/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
--
diff --git 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
index 24a758f..92bb9e6 100644
--- 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
+++ 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.util.Utils
  * for different tests and there are a few properties needed to let Hive 
generate golden
  * files, every `createQueryTest` calls should explicitly set `reset` to 
`false`.
  */
-abstract class HiveWindowFunctionQueryBaseSuite extends HiveComparisonTest 
with BeforeAndAfter {
+class HiveWindowFunctionQuerySuite extends HiveComparisonTest with 
BeforeAndAfter {
   private val originalTimeZone = TimeZone.getDefault
   private val originalLocale = Locale.getDefault
   private val testTempDir = Utils.createTempDir()
@@ -759,21 +759,7 @@ abstract class HiveWindowFunctionQueryBaseSuite extends 
HiveComparisonTest with
 """.stripMargin, reset = false)
 }
 
-class HiveWindowFunctionQueryWithoutCodeGenSuite extends 
HiveWindowFunctionQueryBaseSuite {
-  var originalCodegenEnabled: Boolean = _
-  override def beforeAll(): Unit = {
-super.beforeAll()
-originalCodegenEnabled = conf.codegenEnabled
-sql("set spark.sql.codegen=false")
-  }
-
-  override def afterAll(): Unit = {
-sql(s"set spark.sql.codegen=$originalCodegenEnabled")
-super.afterAll()
-  }
-}
-
-abstract class HiveWindowFunctionQueryFileBaseSuite
+class HiveWindowFunctionQueryFileSuite
   extends HiveCompatibilitySuite with BeforeAndAfter {
   private val originalTimeZone = TimeZone.getDefault
   private val originalLocale = Locale.getDefault
@@ -789,11 +775,11 @@ abstract class HiveWindowFunctionQueryFileBaseSuite
 // The following settings are used for generating golden files with Hive.
 // We have to use kryo to correctly let Hive serialize plans with window 
functions.
 // This is used to generate golden files.
-sql("set hive.plan.serialization.format=kryo")
+// sql("set hive.plan.serialization

spark git commit: [SPARK-9324] [SPARK-9322] [SPARK-9321] [SPARKR] Some aliases for R-like functions in DataFrames

2015-07-31 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master 82f47b811 -> 710c2b5dd


[SPARK-9324] [SPARK-9322] [SPARK-9321] [SPARKR] Some aliases for R-like 
functions in DataFrames

Adds following aliases:
* unique (distinct)
* rbind (unionAll): accepts many DataFrames
* nrow (count)
* ncol
* dim
* names (columns): along with the replacement function to change names

Author: Hossein 

Closes #7764 from falaki/sparkR-alias and squashes the following commits:

56016f5 [Hossein] Updated R documentation
5e4a4d0 [Hossein] Removed extra code
f51cbef [Hossein] Merge branch 'master' into sparkR-alias
c1b88bd [Hossein] Moved setGeneric and other comments applied
d9307f8 [Hossein] Added tests
b5aa988 [Hossein] Added dim, ncol, nrow, names, rbind, and unique functions to 
DataFrames


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/710c2b5d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/710c2b5d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/710c2b5d

Branch: refs/heads/master
Commit: 710c2b5dd2dc6b8d947303ad8dfae4539b63fe11
Parents: 82f47b8
Author: Hossein 
Authored: Fri Jul 31 14:07:41 2015 -0700
Committer: Shivaram Venkataraman 
Committed: Fri Jul 31 14:08:18 2015 -0700

--
 R/pkg/NAMESPACE  |  6 +++
 R/pkg/R/DataFrame.R  | 90 +++
 R/pkg/R/generics.R   |  4 ++
 R/pkg/inst/tests/test_sparkSQL.R | 22 +++--
 4 files changed, 119 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/710c2b5d/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index a329e14..ff116cb 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -29,6 +29,7 @@ exportMethods("arrange",
   "count",
   "crosstab",
   "describe",
+  "dim",
   "distinct",
   "dropna",
   "dtypes",
@@ -45,11 +46,15 @@ exportMethods("arrange",
   "isLocal",
   "join",
   "limit",
+  "names",
+  "ncol",
+  "nrow",
   "orderBy",
   "mutate",
   "names",
   "persist",
   "printSchema",
+  "rbind",
   "registerTempTable",
   "rename",
   "repartition",
@@ -66,6 +71,7 @@ exportMethods("arrange",
   "summarize",
   "take",
   "unionAll",
+  "unique",
   "unpersist",
   "where",
   "withColumn",

http://git-wip-us.apache.org/repos/asf/spark/blob/710c2b5d/R/pkg/R/DataFrame.R
--
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index b31ad37..b4065d2 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -255,6 +255,16 @@ setMethod("names",
 columns(x)
   })
 
+#' @rdname columns
+setMethod("names<-",
+  signature(x = "DataFrame"),
+  function(x, value) {
+if (!is.null(value)) {
+  sdf <- callJMethod(x@sdf, "toDF", listToSeq(as.list(value)))
+  dataFrame(sdf)
+}
+  })
+
 #' Register Temporary Table
 #'
 #' Registers a DataFrame as a Temporary Table in the SQLContext
@@ -473,6 +483,18 @@ setMethod("distinct",
 dataFrame(sdf)
   })
 
+#' @title Distinct rows in a DataFrame
+#
+#' @description Returns a new DataFrame containing distinct rows in this 
DataFrame
+#'
+#' @rdname unique
+#' @aliases unique
+setMethod("unique",
+  signature(x = "DataFrame"),
+  function(x) {
+distinct(x)
+  })
+
 #' Sample
 #'
 #' Return a sampled subset of this DataFrame using a random seed.
@@ -534,6 +556,58 @@ setMethod("count",
 callJMethod(x@sdf, "count")
   })
 
+#' @title Number of rows for a DataFrame
+#' @description Returns number of rows in a DataFrames
+#'
+#' @name nrow
+#'
+#' @rdname nrow
+#' @aliases count
+setMethod("nrow",
+  signature(x = "DataFrame"),
+  function(x) {
+count(x)
+  })
+
+#' Returns the number of columns in a DataFrame
+#'
+#' @param x a SparkSQL DataFrame
+#'
+#' @rdname ncol
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlContext <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlContext, path)
+#' ncol(df)
+#' }
+setMethod("ncol",
+  signature(x = "DataFrame"),
+  function(x) {
+length(columns(x))
+  })
+
+#' Returns the dimentions (number of rows and columns) of a DataFrame
+#' @param x a SparkSQL DataFrame
+#'
+#' @rdname dim
+#' @export
+#' @examples
+#'\don

spark git commit: [SPARK-9507] [BUILD] Remove dependency reduced POM hack now that shade plugin is updated

2015-07-31 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 f941482b0 -> 047a61365


[SPARK-9507] [BUILD] Remove dependency reduced POM hack now that shade plugin 
is updated

Update to shade plugin 2.4.1, which removes the need for the 
dependency-reduced-POM workaround and the 'release' profile. Fix management of 
shade plugin version so children inherit it; bump assembly plugin version while 
here

See https://issues.apache.org/jira/browse/SPARK-8819

I verified that `mvn clean package -DskipTests` works with Maven 3.3.3.

pwendell are you up for trying this for the 1.5.0 release?

Author: Sean Owen 

Closes #7826 from srowen/SPARK-9507 and squashes the following commits:

e0b0fd2 [Sean Owen] Update to shade plugin 2.4.1, which removes the need for 
the dependency-reduced-POM workaround and the 'release' profile. Fix management 
of shade plugin version so children inherit it; bump assembly plugin version 
while here

(cherry picked from commit 6e5fd613ea4b9aa0ab485ba681277a51a4367168)
Signed-off-by: Sean Owen 

# Conflicts:
#   dev/create-release/create-release.sh
#   pom.xml


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/047a6136
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/047a6136
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/047a6136

Branch: refs/heads/branch-1.3
Commit: 047a61365fa795f8e646b61049cde038d074a78c
Parents: f941482
Author: Sean Owen 
Authored: Fri Jul 31 21:51:55 2015 +0100
Committer: Sean Owen 
Committed: Fri Jul 31 22:05:57 2015 +0100

--
 dev/create-release/create-release.sh |  8 
 pom.xml  | 29 +
 2 files changed, 9 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/047a6136/dev/create-release/create-release.sh
--
diff --git a/dev/create-release/create-release.sh 
b/dev/create-release/create-release.sh
index 0979d5e..120914f 100755
--- a/dev/create-release/create-release.sh
+++ b/dev/create-release/create-release.sh
@@ -117,13 +117,13 @@ if [[ ! "$@" =~ --skip-publish ]]; then
   echo "Created Nexus staging repository: $staged_repo_id"
 
   build/mvn -DskipTests -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
--Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl 
-Pkinesis-asl -Prelease-profile \
+-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl 
-Pkinesis-asl \
 clean install
 
   ./dev/change-version-to-2.11.sh
-  
+
   build/mvn -DskipTests -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
--Dscala-2.11 -Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl 
-Prelease-profile \
+-Dscala-2.11 -Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl 
\
 clean install
 
   ./dev/change-version-to-2.10.sh
@@ -197,7 +197,7 @@ if [[ ! "$@" =~ --skip-package ]]; then
 NAME=$1
 FLAGS=$2
 cp -r spark spark-$RELEASE_VERSION-bin-$NAME
-
+
 cd spark-$RELEASE_VERSION-bin-$NAME
 
 # TODO There should probably be a flag to make-distribution to allow 2.11 
support

http://git-wip-us.apache.org/repos/asf/spark/blob/047a6136/pom.xml
--
diff --git a/pom.xml b/pom.xml
index bcc2f51..099b002 100644
--- a/pom.xml
+++ b/pom.xml
@@ -156,7 +156,6 @@
 1.8.8
 2.4.4
 1.1.1.7
-false
 
 
-  
${create.dependency.reduced.pom}
   
 
   
@@ -1751,26 +1752,6 @@
   
 
 
-
-  
-  release-profile
-  
-
-true
-  
-
-
 

spark git commit: [SPARK-9510] [SPARKR] Remaining SparkR style fixes

2015-07-31 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master 6e5fd613e -> 82f47b811


[SPARK-9510] [SPARKR] Remaining SparkR style fixes

With the change in this patch, I get no more warnings from `./dev/lint-r` in my 
machine

Author: Shivaram Venkataraman 

Closes #7834 from shivaram/sparkr-style-fixes and squashes the following 
commits:

716cd8e [Shivaram Venkataraman] Remaining SparkR style fixes


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82f47b81
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82f47b81
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82f47b81

Branch: refs/heads/master
Commit: 82f47b811607a1cba437fe0ffc15d4e5f9ec
Parents: 6e5fd61
Author: Shivaram Venkataraman 
Authored: Fri Jul 31 14:02:44 2015 -0700
Committer: Shivaram Venkataraman 
Committed: Fri Jul 31 14:02:44 2015 -0700

--
 R/pkg/R/RDD.R| 6 +++---
 R/pkg/inst/tests/test_sparkSQL.R | 4 +++-
 2 files changed, 6 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/82f47b81/R/pkg/R/RDD.R
--
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 2a013b3..051e441 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -1264,12 +1264,12 @@ setMethod("pipeRDD",
   signature(x = "RDD", command = "character"),
   function(x, command, env = list()) {
 func <- function(part) {
-  trim.trailing.func <- function(x) {
+  trim_trailing_func <- function(x) {
 sub("[\r\n]*$", "", toString(x))
   }
-  input <- unlist(lapply(part, trim.trailing.func))
+  input <- unlist(lapply(part, trim_trailing_func))
   res <- system2(command, stdout = TRUE, input = input, env = env)
-  lapply(res, trim.trailing.func)
+  lapply(res, trim_trailing_func)
 }
 lapplyPartition(x, func)
   })

http://git-wip-us.apache.org/repos/asf/spark/blob/82f47b81/R/pkg/inst/tests/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index aca41aa..25f6973 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -128,7 +128,9 @@ test_that("create DataFrame from RDD", {
   expect_equal(dtypes(df2), list(c("name", "string"), c("age", "int"), 
c("height", "float")))
   expect_equal(collect(where(df2, df2$name == "Bob")), c("Bob", 16, 176.5))
 
-  localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18), 
height=c(164.10, 181.4, 173.7))
+  localDF <- data.frame(name=c("John", "Smith", "Sarah"),
+age=c(19, 23, 18),
+height=c(164.10, 181.4, 173.7))
   df <- createDataFrame(sqlContext, localDF, schema)
   expect_is(df, "DataFrame")
   expect_equal(count(df), 3)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9507] [BUILD] Remove dependency reduced POM hack now that shade plugin is updated

2015-07-31 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 873ab0f96 -> 6e5fd613e


[SPARK-9507] [BUILD] Remove dependency reduced POM hack now that shade plugin 
is updated

Update to shade plugin 2.4.1, which removes the need for the 
dependency-reduced-POM workaround and the 'release' profile. Fix management of 
shade plugin version so children inherit it; bump assembly plugin version while 
here

See https://issues.apache.org/jira/browse/SPARK-8819

I verified that `mvn clean package -DskipTests` works with Maven 3.3.3.

pwendell are you up for trying this for the 1.5.0 release?

Author: Sean Owen 

Closes #7826 from srowen/SPARK-9507 and squashes the following commits:

e0b0fd2 [Sean Owen] Update to shade plugin 2.4.1, which removes the need for 
the dependency-reduced-POM workaround and the 'release' profile. Fix management 
of shade plugin version so children inherit it; bump assembly plugin version 
while here


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e5fd613
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e5fd613
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e5fd613

Branch: refs/heads/master
Commit: 6e5fd613ea4b9aa0ab485ba681277a51a4367168
Parents: 873ab0f
Author: Sean Owen 
Authored: Fri Jul 31 21:51:55 2015 +0100
Committer: Sean Owen 
Committed: Fri Jul 31 21:51:55 2015 +0100

--
 dev/create-release/create-release.sh |  4 ++--
 pom.xml  | 33 ++-
 2 files changed, 8 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6e5fd613/dev/create-release/create-release.sh
--
diff --git a/dev/create-release/create-release.sh 
b/dev/create-release/create-release.sh
index 86a7a40..4311c8c 100755
--- a/dev/create-release/create-release.sh
+++ b/dev/create-release/create-release.sh
@@ -118,13 +118,13 @@ if [[ ! "$@" =~ --skip-publish ]]; then
 
   rm -rf $SPARK_REPO
 
-  build/mvn -DskipTests -Pyarn -Phive -Prelease\
+  build/mvn -DskipTests -Pyarn -Phive \
 -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
 clean install
 
   ./dev/change-scala-version.sh 2.11
   
-  build/mvn -DskipTests -Pyarn -Phive -Prelease\
+  build/mvn -DskipTests -Pyarn -Phive \
 -Dscala-2.11 -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
 clean install
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6e5fd613/pom.xml
--
diff --git a/pom.xml b/pom.xml
index e351c7c..1371a1b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -160,9 +160,6 @@
 2.4.4
 1.1.1.7
 1.1.2
-
-false
-
 ${java.home}
 
 
-  
${create.dependency.reduced.pom}
   
 
   
@@ -1836,26 +1835,6 @@
   
 
 
-
-  
-  release
-  
-
-true
-  
-
-
 

spark git commit: [SPARK-9507] [BUILD] Remove dependency reduced POM hack now that shade plugin is updated

2015-07-31 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 5ad9f950c -> b53ca247d


[SPARK-9507] [BUILD] Remove dependency reduced POM hack now that shade plugin 
is updated

Update to shade plugin 2.4.1, which removes the need for the 
dependency-reduced-POM workaround and the 'release' profile. Fix management of 
shade plugin version so children inherit it; bump assembly plugin version while 
here

See https://issues.apache.org/jira/browse/SPARK-8819

I verified that `mvn clean package -DskipTests` works with Maven 3.3.3.

pwendell are you up for trying this for the 1.5.0 release?

Author: Sean Owen 

Closes #7826 from srowen/SPARK-9507 and squashes the following commits:

e0b0fd2 [Sean Owen] Update to shade plugin 2.4.1, which removes the need for 
the dependency-reduced-POM workaround and the 'release' profile. Fix management 
of shade plugin version so children inherit it; bump assembly plugin version 
while here

(cherry picked from commit 6e5fd613ea4b9aa0ab485ba681277a51a4367168)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b53ca247
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b53ca247
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b53ca247

Branch: refs/heads/branch-1.4
Commit: b53ca247d4a965002a9f31758ea2b28fe117d45f
Parents: 5ad9f95
Author: Sean Owen 
Authored: Fri Jul 31 21:51:55 2015 +0100
Committer: Sean Owen 
Committed: Fri Jul 31 21:52:11 2015 +0100

--
 dev/create-release/create-release.sh |  4 ++--
 pom.xml  | 33 ++-
 2 files changed, 8 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b53ca247/dev/create-release/create-release.sh
--
diff --git a/dev/create-release/create-release.sh 
b/dev/create-release/create-release.sh
index 30190dc..54274a8 100755
--- a/dev/create-release/create-release.sh
+++ b/dev/create-release/create-release.sh
@@ -118,13 +118,13 @@ if [[ ! "$@" =~ --skip-publish ]]; then
 
   rm -rf $SPARK_REPO
 
-  build/mvn -DskipTests -Pyarn -Phive -Prelease\
+  build/mvn -DskipTests -Pyarn -Phive \
 -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
 clean install
 
   ./dev/change-version-to-2.11.sh
   
-  build/mvn -DskipTests -Pyarn -Phive -Prelease\
+  build/mvn -DskipTests -Pyarn -Phive \
 -Dscala-2.11 -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
 clean install
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b53ca247/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 53e9388..6c102da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -162,9 +162,6 @@
 2.4.4
 1.1.1.7
 1.1.2
-
-false
-
 ${java.home}
 
 
-  
${create.dependency.reduced.pom}
   
 
   
@@ -1817,26 +1816,6 @@
   
 
 
-
-  
-  release
-  
-
-true
-  
-
-
 

spark git commit: [SPARK-9490] [DOCS] [MLLIB] MLlib evaluation metrics guide example python code uses deprecated print statement

2015-07-31 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 815c8245f -> 873ab0f96


[SPARK-9490] [DOCS] [MLLIB] MLlib evaluation metrics guide example python code 
uses deprecated print statement

Use print(x) not print x for Python 3 in eval examples
CC sethah mengxr -- just wanted to close this out before 1.5

Author: Sean Owen 

Closes #7822 from srowen/SPARK-9490 and squashes the following commits:

01abeba [Sean Owen] Change "print x" to "print(x)" in the rest of the docs too
bd7f7fb [Sean Owen] Use print(x) not print x for Python 3 in eval examples


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/873ab0f9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/873ab0f9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/873ab0f9

Branch: refs/heads/master
Commit: 873ab0f9692d8ea6220abdb8d9200041068372a8
Parents: 815c824
Author: Sean Owen 
Authored: Fri Jul 31 13:45:28 2015 -0700
Committer: Xiangrui Meng 
Committed: Fri Jul 31 13:45:28 2015 -0700

--
 docs/ml-guide.md|  2 +-
 docs/mllib-evaluation-metrics.md| 66 
 docs/mllib-feature-extraction.md|  2 +-
 docs/mllib-statistics.md| 20 +-
 docs/quick-start.md |  2 +-
 docs/sql-programming-guide.md   |  6 +--
 docs/streaming-programming-guide.md |  2 +-
 7 files changed, 50 insertions(+), 50 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/873ab0f9/docs/ml-guide.md
--
diff --git a/docs/ml-guide.md b/docs/ml-guide.md
index 8c46adf..b6ca50e 100644
--- a/docs/ml-guide.md
+++ b/docs/ml-guide.md
@@ -561,7 +561,7 @@ test = sc.parallelize([(4L, "spark i j k"),
 prediction = model.transform(test)
 selected = prediction.select("id", "text", "prediction")
 for row in selected.collect():
-print row
+print(row)
 
 sc.stop()
 {% endhighlight %}

http://git-wip-us.apache.org/repos/asf/spark/blob/873ab0f9/docs/mllib-evaluation-metrics.md
--
diff --git a/docs/mllib-evaluation-metrics.md b/docs/mllib-evaluation-metrics.md
index 4ca0bb0..7066d5c 100644
--- a/docs/mllib-evaluation-metrics.md
+++ b/docs/mllib-evaluation-metrics.md
@@ -302,10 +302,10 @@ predictionAndLabels = test.map(lambda lp: 
(float(model.predict(lp.features)), lp
 metrics = BinaryClassificationMetrics(predictionAndLabels)
 
 # Area under precision-recall curve
-print "Area under PR = %s" % metrics.areaUnderPR
+print("Area under PR = %s" % metrics.areaUnderPR)
 
 # Area under ROC curve
-print "Area under ROC = %s" % metrics.areaUnderROC
+print("Area under ROC = %s" % metrics.areaUnderROC)
 
 {% endhighlight %}
 
@@ -606,24 +606,24 @@ metrics = MulticlassMetrics(predictionAndLabels)
 precision = metrics.precision()
 recall = metrics.recall()
 f1Score = metrics.fMeasure()
-print "Summary Stats"
-print "Precision = %s" % precision
-print "Recall = %s" % recall
-print "F1 Score = %s" % f1Score
+print("Summary Stats")
+print("Precision = %s" % precision)
+print("Recall = %s" % recall)
+print("F1 Score = %s" % f1Score)
 
 # Statistics by class
 labels = data.map(lambda lp: lp.label).distinct().collect()
 for label in sorted(labels):
-print "Class %s precision = %s" % (label, metrics.precision(label))
-print "Class %s recall = %s" % (label, metrics.recall(label))
-print "Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, 
beta=1.0))
+print("Class %s precision = %s" % (label, metrics.precision(label)))
+print("Class %s recall = %s" % (label, metrics.recall(label)))
+print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, 
beta=1.0)))
 
 # Weighted stats
-print "Weighted recall = %s" % metrics.weightedRecall
-print "Weighted precision = %s" % metrics.weightedPrecision
-print "Weighted F(1) Score = %s" % metrics.weightedFMeasure()
-print "Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5)
-print "Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate
+print("Weighted recall = %s" % metrics.weightedRecall)
+print("Weighted precision = %s" % metrics.weightedPrecision)
+print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
+print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
+print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)
 {% endhighlight %}
 
 
@@ -881,28 +881,28 @@ scoreAndLabels = sc.parallelize([
 metrics = MultilabelMetrics(scoreAndLabels)
 
 # Summary stats
-print "Recall = %s" % metrics.recall()
-print "Precision = %s" % metrics.precision()
-print "F1 measure = %s" % metrics.f1Measure()
-print "Accuracy = %s" % metrics.accuracy
+print("Recall = %s" % metrics.recall())
+print("Precision = %s" % metrics.precision())
+

spark git commit: [SPARK-9466] [SQL] Increate two timeouts in CliSuite.

2015-07-31 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master fbef566a1 -> 815c8245f


[SPARK-9466] [SQL] Increate two timeouts in CliSuite.

Hopefully this can resolve the flakiness of this suite.

JIRA: https://issues.apache.org/jira/browse/SPARK-9466

Author: Yin Huai 

Closes # from yhuai/SPARK-9466 and squashes the following commits:

e0e3a86 [Yin Huai] Increate the timeout.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/815c8245
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/815c8245
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/815c8245

Branch: refs/heads/master
Commit: 815c8245f47e61226a04e2e02f508457b5e9e536
Parents: fbef566
Author: Yin Huai 
Authored: Fri Jul 31 13:45:12 2015 -0700
Committer: Yin Huai 
Committed: Fri Jul 31 13:45:12 2015 -0700

--
 .../scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/815c8245/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
--
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 13b0c59..df80d04 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -137,7 +137,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter 
with Logging {
   }
 
   test("Single command with --database") {
-runCliWithin(1.minute)(
+runCliWithin(2.minute)(
   "CREATE DATABASE hive_test_db;"
 -> "OK",
   "USE hive_test_db;"
@@ -148,7 +148,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter 
with Logging {
 -> "Time taken: "
 )
 
-runCliWithin(1.minute, Seq("--database", "hive_test_db", "-e", "SHOW 
TABLES;"))(
+runCliWithin(2.minute, Seq("--database", "hive_test_db", "-e", "SHOW 
TABLES;"))(
   ""
 -> "OK",
   ""


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9308] [ML] ml.NaiveBayesModel support predicting class probabilities

2015-07-31 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 060c79aab -> fbef566a1


[SPARK-9308] [ML] ml.NaiveBayesModel support predicting class probabilities

Make NaiveBayesModel support predicting class probabilities, inherit from 
ProbabilisticClassificationModel.

Author: Yanbo Liang 

Closes #7672 from yanboliang/spark-9308 and squashes the following commits:

25e224c [Yanbo Liang] raw2probabilityInPlace should operate in-place
3ee56d6 [Yanbo Liang] change predictRaw and raw2probabilityInPlace
c07e7a2 [Yanbo Liang] ml.NaiveBayesModel support predicting class probabilities


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbef566a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbef566a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbef566a

Branch: refs/heads/master
Commit: fbef566a107b47e5fddde0ea65b8587d5039062d
Parents: 060c79a
Author: Yanbo Liang 
Authored: Fri Jul 31 13:11:42 2015 -0700
Committer: Joseph K. Bradley 
Committed: Fri Jul 31 13:11:42 2015 -0700

--
 .../spark/ml/classification/NaiveBayes.scala| 65 +++-
 .../ml/classification/NaiveBayesSuite.scala | 54 +++-
 2 files changed, 101 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fbef566a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala 
b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
index 5be35fe..b46b676 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
@@ -69,7 +69,7 @@ private[ml] trait NaiveBayesParams extends PredictorParams {
  * The input feature values must be nonnegative.
  */
 class NaiveBayes(override val uid: String)
-  extends Predictor[Vector, NaiveBayes, NaiveBayesModel]
+  extends ProbabilisticClassifier[Vector, NaiveBayes, NaiveBayesModel]
   with NaiveBayesParams {
 
   def this() = this(Identifiable.randomUID("nb"))
@@ -106,7 +106,7 @@ class NaiveBayesModel private[ml] (
 override val uid: String,
 val pi: Vector,
 val theta: Matrix)
-  extends PredictionModel[Vector, NaiveBayesModel] with NaiveBayesParams {
+  extends ProbabilisticClassificationModel[Vector, NaiveBayesModel] with 
NaiveBayesParams {
 
   import OldNaiveBayes.{Bernoulli, Multinomial}
 
@@ -129,29 +129,62 @@ class NaiveBayesModel private[ml] (
   throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
   }
 
-  override protected def predict(features: Vector): Double = {
+  override val numClasses: Int = pi.size
+
+  private def multinomialCalculation(features: Vector) = {
+val prob = theta.multiply(features)
+BLAS.axpy(1.0, pi, prob)
+prob
+  }
+
+  private def bernoulliCalculation(features: Vector) = {
+features.foreachActive((_, value) =>
+  if (value != 0.0 && value != 1.0) {
+throw new SparkException(
+  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$features.")
+  }
+)
+val prob = thetaMinusNegTheta.get.multiply(features)
+BLAS.axpy(1.0, pi, prob)
+BLAS.axpy(1.0, negThetaSum.get, prob)
+prob
+  }
+
+  override protected def predictRaw(features: Vector): Vector = {
 $(modelType) match {
   case Multinomial =>
-val prob = theta.multiply(features)
-BLAS.axpy(1.0, pi, prob)
-prob.argmax
+multinomialCalculation(features)
   case Bernoulli =>
-features.foreachActive{ (index, value) =>
-  if (value != 0.0 && value != 1.0) {
-throw new SparkException(
-  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$features")
-  }
-}
-val prob = thetaMinusNegTheta.get.multiply(features)
-BLAS.axpy(1.0, pi, prob)
-BLAS.axpy(1.0, negThetaSum.get, prob)
-prob.argmax
+bernoulliCalculation(features)
   case _ =>
 // This should never happen.
 throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
 }
   }
 
+  override protected def raw2probabilityInPlace(rawPrediction: Vector): Vector 
= {
+rawPrediction match {
+  case dv: DenseVector =>
+var i = 0
+val size = dv.size
+val maxLog = dv.values.max
+while (i < size) {
+  dv.values(i) = math.exp(dv.values(i) - maxLog)
+  i += 1
+}
+val probSum = dv.values.sum
+i = 0
+while (i < size) {
+  dv.values(i) = dv.values(i) / probSum
+  i += 1
+}
+dv
+  case sv: SparseVector =>
+throw new RuntimeException(

spark git commit: [SPARK-9056] [STREAMING] Rename configuration `spark.streaming.minRememberDuration` to `spark.streaming.fileStream.minRememberDuration`

2015-07-31 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 3c0d2e552 -> 060c79aab


[SPARK-9056] [STREAMING] Rename configuration 
`spark.streaming.minRememberDuration` to 
`spark.streaming.fileStream.minRememberDuration`

Rename configuration `spark.streaming.minRememberDuration` to 
`spark.streaming.fileStream.minRememberDuration`

Author: Sameer Abhyankar 
Author: Sameer Abhyankar 

Closes #7740 from sabhyankar/spark_branch_9056 and squashes the following 
commits:

d5b2f1f [Sameer Abhyankar] Correct deprecated version to 1.5
1268133 [Sameer Abhyankar] Add {} and indentation
ddf9844 [Sameer Abhyankar] Change 4 space indentation to 2 space indentation
1819b5f [Sameer Abhyankar] Use spark.streaming.fileStream.minRememberDuration 
property in lieu of spark.streaming.minRememberDuration


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/060c79aa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/060c79aa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/060c79aa

Branch: refs/heads/master
Commit: 060c79aab58efd4ce7353a1b00534de0d9e1de0b
Parents: 3c0d2e5
Author: Sameer Abhyankar 
Authored: Fri Jul 31 13:08:55 2015 -0700
Committer: Tathagata Das 
Committed: Fri Jul 31 13:08:55 2015 -0700

--
 core/src/main/scala/org/apache/spark/SparkConf.scala   | 4 +++-
 .../org/apache/spark/streaming/dstream/FileInputDStream.scala  | 6 --
 2 files changed, 7 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/060c79aa/core/src/main/scala/org/apache/spark/SparkConf.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 4161792..08bab4b 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -548,7 +548,9 @@ private[spark] object SparkConf extends Logging {
 "spark.rpc.askTimeout" -> Seq(
   AlternateConfig("spark.akka.askTimeout", "1.4")),
 "spark.rpc.lookupTimeout" -> Seq(
-  AlternateConfig("spark.akka.lookupTimeout", "1.4"))
+  AlternateConfig("spark.akka.lookupTimeout", "1.4")),
+"spark.streaming.fileStream.minRememberDuration" -> Seq(
+  AlternateConfig("spark.streaming.minRememberDuration", "1.5"))
 )
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/060c79aa/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index dd4da9d..c358f5b 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -86,8 +86,10 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* Files with mod times older than this "window" of remembering will be 
ignored. So if new
* files are visible within this window, then the file will get selected in 
the next batch.
*/
-  private val minRememberDurationS =
-Seconds(ssc.conf.getTimeAsSeconds("spark.streaming.minRememberDuration", 
"60s"))
+  private val minRememberDurationS = {
+
Seconds(ssc.conf.getTimeAsSeconds("spark.streaming.fileStream.minRememberDuration",
+  ssc.conf.get("spark.streaming.minRememberDuration", "60s")))
+  }
 
   // This is a def so that it works during checkpoint recovery:
   private def clock = ssc.scheduler.clock


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9246] [MLLIB] DistributedLDAModel predict top docs per topic

2015-07-31 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master c0686668a -> 3c0d2e552


[SPARK-9246] [MLLIB] DistributedLDAModel predict top docs per topic

Add topDocumentsPerTopic to DistributedLDAModel.

Add ScalaDoc and unit tests.

Author: Meihua Wu 

Closes #7769 from rotationsymmetry/SPARK-9246 and squashes the following 
commits:

1029e79c [Meihua Wu] clean up code comments
a023b82 [Meihua Wu] Update tests to use Long for doc index.
91e5998 [Meihua Wu] Use Long for doc index.
b9f70cf [Meihua Wu] Revise topDocumentsPerTopic
26ff3f6 [Meihua Wu] Add topDocumentsPerTopic, scala doc and unit tests


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c0d2e55
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c0d2e55
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c0d2e55

Branch: refs/heads/master
Commit: 3c0d2e55210735e0df2f8febb5f63c224af230e3
Parents: c068666
Author: Meihua Wu 
Authored: Fri Jul 31 13:01:10 2015 -0700
Committer: Joseph K. Bradley 
Committed: Fri Jul 31 13:01:10 2015 -0700

--
 .../spark/mllib/clustering/LDAModel.scala   | 37 
 .../spark/mllib/clustering/LDASuite.scala   | 22 
 2 files changed, 59 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3c0d2e55/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index ff7035d..0cdac84 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -516,6 +516,43 @@ class DistributedLDAModel private[clustering] (
 }
   }
 
+  /**
+   * Return the top documents for each topic
+   *
+   * This is approximate; it may not return exactly the top-weighted documents 
for each topic.
+   * To get a more precise set of top documents, increase maxDocumentsPerTopic.
+   *
+   * @param maxDocumentsPerTopic  Maximum number of documents to collect for 
each topic.
+   * @return  Array over topics.  Each element represent as a pair of matching 
arrays:
+   *  (IDs for the documents, weights of the topic in these documents).
+   *  For each topic, documents are sorted in order of decreasing 
topic weights.
+   */
+  def topDocumentsPerTopic(maxDocumentsPerTopic: Int): Array[(Array[Long], 
Array[Double])] = {
+val numTopics = k
+val topicsInQueues: Array[BoundedPriorityQueue[(Double, Long)]] =
+  topicDistributions.mapPartitions { docVertices =>
+// For this partition, collect the most common docs for each topic in 
queues:
+//  queues(topic) = queue of (doc topic, doc ID).
+val queues =
+  Array.fill(numTopics)(new BoundedPriorityQueue[(Double, 
Long)](maxDocumentsPerTopic))
+for ((docId, docTopics) <- docVertices) {
+  var topic = 0
+  while (topic < numTopics) {
+queues(topic) += (docTopics(topic) -> docId)
+topic += 1
+  }
+}
+Iterator(queues)
+  }.treeReduce { (q1, q2) =>
+q1.zip(q2).foreach { case (a, b) => a ++= b }
+q1
+  }
+topicsInQueues.map { q =>
+  val (docTopics, docs) = q.toArray.sortBy(-_._1).unzip
+  (docs.toArray, docTopics.toArray)
+}
+  }
+
   // TODO
   // override def logLikelihood(documents: RDD[(Long, Vector)]): Double = ???
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0d2e55/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
index 79d2a1c..f2b9470 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
@@ -122,6 +122,28 @@ class LDASuite extends SparkFunSuite with 
MLlibTestSparkContext {
 // Check: log probabilities
 assert(model.logLikelihood < 0.0)
 assert(model.logPrior < 0.0)
+
+// Check: topDocumentsPerTopic
+// Compare it with top documents per topic derived from topicDistributions
+val topDocsByTopicDistributions = { n: Int =>
+  Range(0, k).map { topic =>
+val (doc, docWeights) = 
topicDistributions.sortBy(-_._2(topic)).take(n).unzip
+(doc.toArray, docWeights.map(_(topic)).toArray)
+  }.toArray
+}
+
+// Top 3 documents per topic
+model.topDocumentsPerTopic(3).zip(topDocsByTopicDistributions(3)).foreach 
{case (t1, t2) =>
+  assert(t1

spark git commit: [SPARK-9202] capping maximum number of executor&driver information kept in Worker

2015-07-31 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master a8340fa7d -> c0686668a


[SPARK-9202] capping maximum number of executor&driver information kept in 
Worker

https://issues.apache.org/jira/browse/SPARK-9202

Author: CodingCat 

Closes #7714 from CodingCat/SPARK-9202 and squashes the following commits:

23977fb [CodingCat] add comments about why we don't synchronize 
finishedExecutors & finishedDrivers
dc9772d [CodingCat] addressing the comments
e125241 [CodingCat] stylistic fix
80bfe52 [CodingCat] fix JsonProtocolSuite
d7d9485 [CodingCat] styistic fix and respect insert ordering
031755f [CodingCat] add license info & stylistic fix
c3b5361 [CodingCat] test cases and docs
c557b3a [CodingCat] applications are fine
9cac751 [CodingCat] application is fine...
ad87ed7 [CodingCat] trimFinishedExecutorsAndDrivers


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0686668
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0686668
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0686668

Branch: refs/heads/master
Commit: c0686668ae6a92b6bb4801a55c3b78aedbee816a
Parents: a8340fa
Author: CodingCat 
Authored: Fri Jul 31 20:27:00 2015 +0100
Committer: Sean Owen 
Committed: Fri Jul 31 20:27:00 2015 +0100

--
 .../org/apache/spark/deploy/worker/Worker.scala | 124 +++--
 .../spark/deploy/worker/ui/WorkerWebUI.scala|   4 +-
 .../apache/spark/deploy/DeployTestUtils.scala   |  89 +
 .../apache/spark/deploy/JsonProtocolSuite.scala |  59 ++--
 .../spark/deploy/worker/WorkerSuite.scala   | 133 ++-
 docs/configuration.md   |  14 ++
 6 files changed, 329 insertions(+), 94 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c0686668/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 82e9578..0276c24 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -25,7 +25,7 @@ import java.util.concurrent._
 import java.util.concurrent.{Future => JFuture, ScheduledFuture => 
JScheduledFuture}
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{HashMap, HashSet}
+import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap}
 import scala.concurrent.ExecutionContext
 import scala.util.Random
 import scala.util.control.NonFatal
@@ -115,13 +115,18 @@ private[worker] class Worker(
 }
 
   var workDir: File = null
-  val finishedExecutors = new HashMap[String, ExecutorRunner]
+  val finishedExecutors = new LinkedHashMap[String, ExecutorRunner]
   val drivers = new HashMap[String, DriverRunner]
   val executors = new HashMap[String, ExecutorRunner]
-  val finishedDrivers = new HashMap[String, DriverRunner]
+  val finishedDrivers = new LinkedHashMap[String, DriverRunner]
   val appDirectories = new HashMap[String, Seq[String]]
   val finishedApps = new HashSet[String]
 
+  val retainedExecutors = conf.getInt("spark.worker.ui.retainedExecutors",
+WorkerWebUI.DEFAULT_RETAINED_EXECUTORS)
+  val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers",
+WorkerWebUI.DEFAULT_RETAINED_DRIVERS)
+
   // The shuffle service is not actually started unless configured.
   private val shuffleService = new ExternalShuffleService(conf, securityMgr)
 
@@ -461,25 +466,7 @@ private[worker] class Worker(
   }
 
 case executorStateChanged @ ExecutorStateChanged(appId, execId, state, 
message, exitStatus) =>
-  sendToMaster(executorStateChanged)
-  val fullId = appId + "/" + execId
-  if (ExecutorState.isFinished(state)) {
-executors.get(fullId) match {
-  case Some(executor) =>
-logInfo("Executor " + fullId + " finished with state " + state +
-  message.map(" message " + _).getOrElse("") +
-  exitStatus.map(" exitStatus " + _).getOrElse(""))
-executors -= fullId
-finishedExecutors(fullId) = executor
-coresUsed -= executor.cores
-memoryUsed -= executor.memory
-  case None =>
-logInfo("Unknown Executor " + fullId + " finished with state " + 
state +
-  message.map(" message " + _).getOrElse("") +
-  exitStatus.map(" exitStatus " + _).getOrElse(""))
-}
-maybeCleanupApplication(appId)
-  }
+  handleExecutorStateChanged(executorStateChanged)
 
 case KillExecutor(masterUrl, appId, execId) =>
   if (masterUrl != activeMasterUrl) {
@@ -523,24 +510,8 @@ private[worker] class Worker(
   }
 }
 
-

spark git commit: [SPARK-9481] Add logLikelihood to LocalLDAModel

2015-07-31 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master d04634701 -> a8340fa7d


[SPARK-9481] Add logLikelihood to LocalLDAModel

jkbradley Exposes `bound` (variational log likelihood bound) through public API 
as `logLikelihood`. Also adds unit tests, some DRYing of `LDASuite`, and 
includes unit tests mentioned in #7760

Author: Feynman Liang 

Closes #7801 from feynmanliang/SPARK-9481-logLikelihood and squashes the 
following commits:

6d1b2c9 [Feynman Liang] Negate perplexity definition
5f62b20 [Feynman Liang] Add logLikelihood


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8340fa7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8340fa7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8340fa7

Branch: refs/heads/master
Commit: a8340fa7df17e3f0a3658f8b8045ab840845a72a
Parents: d046347
Author: Feynman Liang 
Authored: Fri Jul 31 12:12:22 2015 -0700
Committer: Joseph K. Bradley 
Committed: Fri Jul 31 12:12:22 2015 -0700

--
 .../spark/mllib/clustering/LDAModel.scala   |  20 ++-
 .../spark/mllib/clustering/LDASuite.scala   | 129 ++-
 2 files changed, 78 insertions(+), 71 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a8340fa7/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index 82281a0..ff7035d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -217,22 +217,28 @@ class LocalLDAModel private[clustering] (
 LocalLDAModel.SaveLoadV1_0.save(sc, path, topicsMatrix, docConcentration, 
topicConcentration,
   gammaShape)
   }
-  // TODO
-  // override def logLikelihood(documents: RDD[(Long, Vector)]): Double = ???
+
+  // TODO: declare in LDAModel and override once implemented in 
DistributedLDAModel
+  /**
+   * Calculates a lower bound on the log likelihood of the entire corpus.
+   * @param documents test corpus to use for calculating log likelihood
+   * @return variational lower bound on the log likelihood of the entire corpus
+   */
+  def logLikelihood(documents: RDD[(Long, Vector)]): Double = bound(documents,
+docConcentration, topicConcentration, topicsMatrix.toBreeze.toDenseMatrix, 
gammaShape, k,
+vocabSize)
 
   /**
-   * Calculate the log variational bound on perplexity. See Equation (16) in 
original Online
+   * Calculate an upper bound bound on perplexity. See Equation (16) in 
original Online
* LDA paper.
* @param documents test corpus to use for calculating perplexity
-   * @return the log perplexity per word
+   * @return variational upper bound on log perplexity per word
*/
   def logPerplexity(documents: RDD[(Long, Vector)]): Double = {
 val corpusWords = documents
   .map { case (_, termCounts) => termCounts.toArray.sum }
   .sum()
-val batchVariationalBound = bound(documents, docConcentration,
-  topicConcentration, topicsMatrix.toBreeze.toDenseMatrix, gammaShape, k, 
vocabSize)
-val perWordBound = batchVariationalBound / corpusWords
+val perWordBound = -logLikelihood(documents) / corpusWords
 
 perWordBound
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a8340fa7/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
index 695ee3b..79d2a1c 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
@@ -210,16 +210,7 @@ class LDASuite extends SparkFunSuite with 
MLlibTestSparkContext {
   }
 
   test("OnlineLDAOptimizer with toy data") {
-def toydata: Array[(Long, Vector)] = Array(
-  Vectors.sparse(6, Array(0, 1), Array(1, 1)),
-  Vectors.sparse(6, Array(1, 2), Array(1, 1)),
-  Vectors.sparse(6, Array(0, 2), Array(1, 1)),
-  Vectors.sparse(6, Array(3, 4), Array(1, 1)),
-  Vectors.sparse(6, Array(3, 5), Array(1, 1)),
-  Vectors.sparse(6, Array(4, 5), Array(1, 1))
-).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, 
wordCounts) }
-
-val docs = sc.parallelize(toydata)
+val docs = sc.parallelize(toyData)
 val op = new 
OnlineLDAOptimizer().setMiniBatchFraction(1).setTau0(1024).setKappa(0.51)
   .setGammaShape(1e10)
 val lda = new LDA().setK(2)
@@ -242,30 +233,45 @@ class LDASuite extends SparkFunSuite with

spark git commit: [SPARK-9504] [STREAMING] [TESTS] Use eventually to fix the flaky test

2015-07-31 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 3afc1de89 -> d04634701


[SPARK-9504] [STREAMING] [TESTS] Use eventually to fix the flaky test

The previous code uses `ssc.awaitTerminationOrTimeout(500)`. Since nobody will 
stop it during `awaitTerminationOrTimeout`, it's just like `sleep(500)`. In a 
super overloaded Jenkins worker, the receiver may be not able to start in 500 
milliseconds. Verified this in the log of 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39149/ There 
is no log about starting the receiver before this failure. That's why 
`assert(runningCount > 0)` failed.

This PR replaces `awaitTerminationOrTimeout` with `eventually` which should be 
more reliable.

Author: zsxwing 

Closes #7823 from zsxwing/SPARK-9504 and squashes the following commits:

7af66a6 [zsxwing] Remove wrong assertion
5ba2c99 [zsxwing] Use eventually to fix the flaky test


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0463470
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0463470
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0463470

Branch: refs/heads/master
Commit: d04634701413410938a133358fe1d9fbc077645e
Parents: 3afc1de
Author: zsxwing 
Authored: Fri Jul 31 12:10:55 2015 -0700
Committer: Tathagata Das 
Committed: Fri Jul 31 12:10:55 2015 -0700

--
 .../apache/spark/streaming/StreamingContextSuite.scala| 10 +-
 1 file changed, 5 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d0463470/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 84a5fbb..b7db280 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -261,7 +261,7 @@ class StreamingContextSuite extends SparkFunSuite with 
BeforeAndAfter with Timeo
 for (i <- 1 to 4) {
   logInfo("==\n\n\n")
   ssc = new StreamingContext(sc, Milliseconds(100))
-  var runningCount = 0
+  @volatile var runningCount = 0
   TestReceiver.counter.set(1)
   val input = ssc.receiverStream(new TestReceiver)
   input.count().foreachRDD { rdd =>
@@ -270,14 +270,14 @@ class StreamingContextSuite extends SparkFunSuite with 
BeforeAndAfter with Timeo
 logInfo("Count = " + count + ", Running count = " + runningCount)
   }
   ssc.start()
-  ssc.awaitTerminationOrTimeout(500)
+  eventually(timeout(10.seconds), interval(10.millis)) {
+assert(runningCount > 0)
+  }
   ssc.stop(stopSparkContext = false, stopGracefully = true)
   logInfo("Running count = " + runningCount)
   logInfo("TestReceiver.counter = " + TestReceiver.counter.get())
-  assert(runningCount > 0)
   assert(
-(TestReceiver.counter.get() == runningCount + 1) ||
-  (TestReceiver.counter.get() == runningCount + 2),
+TestReceiver.counter.get() == runningCount + 1,
 "Received records = " + TestReceiver.counter.get() + ", " +
   "processed records = " + runningCount
   )


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-8564] [STREAMING] Add the Python API for Kinesis

2015-07-31 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 39ab199a3 -> 3afc1de89


[SPARK-8564] [STREAMING] Add the Python API for Kinesis

This PR adds the Python API for Kinesis, including a Python example and a 
simple unit test.

Author: zsxwing 

Closes #6955 from zsxwing/kinesis-python and squashes the following commits:

e42e471 [zsxwing] Merge branch 'master' into kinesis-python
455f7ea [zsxwing] Remove streaming_kinesis_asl_assembly module and simply add 
the source folder to streaming_kinesis_asl module
32e6451 [zsxwing] Merge remote-tracking branch 'origin/master' into 
kinesis-python
5082d28 [zsxwing] Fix the syntax error for Python 2.6
fca416b [zsxwing] Fix wrong comparison
96670ff [zsxwing] Fix the compilation error after merging master
756a128 [zsxwing] Merge branch 'master' into kinesis-python
6c37395 [zsxwing] Print stack trace for debug
7c5cfb0 [zsxwing] RUN_KINESIS_TESTS -> ENABLE_KINESIS_TESTS
cc9d071 [zsxwing] Fix the python test errors
466b425 [zsxwing] Add python tests for Kinesis
e33d505 [zsxwing] Merge remote-tracking branch 'origin/master' into 
kinesis-python
3da2601 [zsxwing] Fix the kinesis folder
687446b [zsxwing] Fix the error message and the maven output path
add2beb [zsxwing] Merge branch 'master' into kinesis-python
4957c0b [zsxwing] Add the Python API for Kinesis


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3afc1de8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3afc1de8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3afc1de8

Branch: refs/heads/master
Commit: 3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0
Parents: 39ab199
Author: zsxwing 
Authored: Fri Jul 31 12:09:48 2015 -0700
Committer: Tathagata Das 
Committed: Fri Jul 31 12:09:48 2015 -0700

--
 dev/run-tests.py|   3 +-
 dev/sparktestsupport/modules.py |   9 +-
 docs/streaming-kinesis-integration.md   |  19 
 extras/kinesis-asl-assembly/pom.xml | 103 +
 .../examples/streaming/kinesis_wordcount_asl.py |  81 ++
 .../streaming/kinesis/KinesisTestUtils.scala|  19 +++-
 .../spark/streaming/kinesis/KinesisUtils.scala  |  78 ++---
 pom.xml |   1 +
 project/SparkBuild.scala|   6 +-
 python/pyspark/streaming/kinesis.py | 112 +++
 python/pyspark/streaming/tests.py   |  86 +-
 11 files changed, 492 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3afc1de8/dev/run-tests.py
--
diff --git a/dev/run-tests.py b/dev/run-tests.py
index 29420da..b6d1814 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -301,7 +301,8 @@ def build_spark_sbt(hadoop_version):
 sbt_goals = ["package",
  "assembly/assembly",
  "streaming-kafka-assembly/assembly",
- "streaming-flume-assembly/assembly"]
+ "streaming-flume-assembly/assembly",
+ "streaming-kinesis-asl-assembly/assembly"]
 profiles_and_goals = build_profiles + sbt_goals
 
 print("[info] Building Spark (w/Hive 0.13.1) using SBT with these 
arguments: ",

http://git-wip-us.apache.org/repos/asf/spark/blob/3afc1de8/dev/sparktestsupport/modules.py
--
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 44600cb..956dc81 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -138,6 +138,7 @@ streaming_kinesis_asl = Module(
 dependencies=[],
 source_file_regexes=[
 "extras/kinesis-asl/",
+"extras/kinesis-asl-assembly/",
 ],
 build_profile_flags=[
 "-Pkinesis-asl",
@@ -300,7 +301,13 @@ pyspark_sql = Module(
 
 pyspark_streaming = Module(
 name="pyspark-streaming",
-dependencies=[pyspark_core, streaming, streaming_kafka, 
streaming_flume_assembly],
+dependencies=[
+pyspark_core,
+streaming,
+streaming_kafka,
+streaming_flume_assembly,
+streaming_kinesis_asl
+],
 source_file_regexes=[
 "python/pyspark/streaming"
 ],

http://git-wip-us.apache.org/repos/asf/spark/blob/3afc1de8/docs/streaming-kinesis-integration.md
--
diff --git a/docs/streaming-kinesis-integration.md 
b/docs/streaming-kinesis-integration.md
index aa9749a..a7bcaec 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -52,6 +52,17 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
and the 
[example]({{site.SPARK_G

spark git commit: [SPARK-8640] [SQL] Enable Processing of Multiple Window Frames in a Single Window Operator

2015-07-31 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 0a1d2ca42 -> 39ab199a3


[SPARK-8640] [SQL] Enable Processing of Multiple Window Frames in a Single 
Window Operator

This PR enables the processing of multiple window frames in a single window 
operator. This should improve the performance of processing multiple window 
expressions wich share partition by/order by clauses, because it will be more 
efficient with respect to memory use and group processing.

Author: Herman van Hovell 

Closes #7515 from hvanhovell/SPARK-8640 and squashes the following commits:

f0e1c21 [Herman van Hovell] Changed Window Logical/Physical plans to use 
partition by/order by specs directly instead of using WindowSpec.
e1711c2 [Herman van Hovell] Enabled the processing of multiple window frames in 
a single Window operator.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39ab199a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39ab199a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39ab199a

Branch: refs/heads/master
Commit: 39ab199a3f735b7658ab3331d3e2fb03441aec13
Parents: 0a1d2ca
Author: Herman van Hovell 
Authored: Fri Jul 31 12:07:18 2015 -0700
Committer: Yin Huai 
Committed: Fri Jul 31 12:08:25 2015 -0700

--
 .../spark/sql/catalyst/analysis/Analyzer.scala   | 12 +++-
 .../catalyst/plans/logical/basicOperators.scala  |  3 ++-
 .../spark/sql/execution/SparkStrategies.scala|  5 +++--
 .../org/apache/spark/sql/execution/Window.scala  | 19 ++-
 .../spark/sql/hive/execution/HivePlanTest.scala  | 18 ++
 5 files changed, 40 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/39ab199a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 265f3d1..51d910b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -347,7 +347,7 @@ class Analyzer(
 val newOutput = oldVersion.generatorOutput.map(_.newInstance())
 (oldVersion, oldVersion.copy(generatorOutput = newOutput))
 
-  case oldVersion @ Window(_, windowExpressions, _, child)
+  case oldVersion @ Window(_, windowExpressions, _, _, child)
   if 
AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
 .nonEmpty =>
 (oldVersion, oldVersion.copy(windowExpressions = 
newAliases(windowExpressions)))
@@ -825,7 +825,7 @@ class Analyzer(
 }.asInstanceOf[NamedExpression]
   }
 
-  // Second, we group extractedWindowExprBuffer based on their Window Spec.
+  // Second, we group extractedWindowExprBuffer based on their Partition 
and Order Specs.
   val groupedWindowExpressions = extractedWindowExprBuffer.groupBy { expr 
=>
 val distinctWindowSpec = expr.collect {
   case window: WindowExpression => window.windowSpec
@@ -841,7 +841,8 @@ class Analyzer(
   failAnalysis(s"$expr has multiple Window Specifications 
($distinctWindowSpec)." +
 s"Please file a bug report with this error message, stack trace, 
and the query.")
 } else {
-  distinctWindowSpec.head
+  val spec = distinctWindowSpec.head
+  (spec.partitionSpec, spec.orderSpec)
 }
   }.toSeq
 
@@ -850,9 +851,10 @@ class Analyzer(
   var currentChild = child
   var i = 0
   while (i < groupedWindowExpressions.size) {
-val (windowSpec, windowExpressions) = groupedWindowExpressions(i)
+val ((partitionSpec, orderSpec), windowExpressions) = 
groupedWindowExpressions(i)
 // Set currentChild to the newly created Window operator.
-currentChild = Window(currentChild.output, windowExpressions, 
windowSpec, currentChild)
+currentChild = Window(currentChild.output, windowExpressions,
+  partitionSpec, orderSpec, currentChild)
 
 // Move to next Window Spec.
 i += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/39ab199a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index a67f8de..aacfc86 100644
--- 
a/sql/catalyst/src/main/scala/org/ap

spark git commit: [SPARK-8979] Add a PID based rate estimator

2015-07-31 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master e8bdcdeab -> 0a1d2ca42


[SPARK-8979] Add a PID based rate estimator

Based on #7600

/cc tdas

Author: Iulian Dragos 
Author: François Garillot 

Closes #7648 from dragos/topic/streaming-bp/pid and squashes the following 
commits:

aa5b097 [Iulian Dragos] Add more comments, made all PID constant parameters 
positive, a couple more tests.
93b74f8 [Iulian Dragos] Better explanation of historicalError.
7975b0c [Iulian Dragos] Add configuration for PID.
26cfd78 [Iulian Dragos] A couple of variable renames.
d0bdf7c [Iulian Dragos] Update to latest version of the code, various style and 
name improvements.
d58b845 [François Garillot] [SPARK-8979][Streaming] Implements a 
PIDRateEstimator


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a1d2ca4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a1d2ca4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a1d2ca4

Branch: refs/heads/master
Commit: 0a1d2ca42c8b31d6b0e70163795f0185d4622f87
Parents: e8bdcde
Author: Iulian Dragos 
Authored: Fri Jul 31 12:04:03 2015 -0700
Committer: Tathagata Das 
Committed: Fri Jul 31 12:04:03 2015 -0700

--
 .../dstream/ReceiverInputDStream.scala  |   2 +-
 .../scheduler/rate/PIDRateEstimator.scala   | 124 +
 .../scheduler/rate/RateEstimator.scala  |  18 ++-
 .../scheduler/rate/PIDRateEstimatorSuite.scala  | 137 +++
 4 files changed, 276 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0a1d2ca4/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 646a8c3..670ef8d 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -46,7 +46,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient 
ssc_ : StreamingCont
*/
   override protected[streaming] val rateController: Option[RateController] = {
 if (RateController.isBackPressureEnabled(ssc.conf)) {
-  RateEstimator.create(ssc.conf).map { new ReceiverRateController(id, _) }
+  Some(new ReceiverRateController(id, RateEstimator.create(ssc.conf, 
ssc.graph.batchDuration)))
 } else {
   None
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0a1d2ca4/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
new file mode 100644
index 000..6ae56a6
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler.rate
+
+/**
+ * Implements a proportional-integral-derivative (PID) controller which acts on
+ * the speed of ingestion of elements into Spark Streaming. A PID controller 
works
+ * by calculating an '''error''' between a measured output and a desired 
value. In the
+ * case of Spark Streaming the error is the difference between the measured 
processing
+ * rate (number of elements/processing delay) and the previous rate.
+ *
+ * @see https://en.wikipedia.org/wiki/PID_controller
+ *
+ * @param batchDurationMillis the batch duration, in milliseconds
+ * @param proportional how much the correction should depend on the current
+ *error. This term usually provides the bulk of correction and should 
be positive or zero.
+ *A value too l

spark git commit: [SPARK-6885] [ML] decision tree support predict class probabilities

2015-07-31 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 4011a9471 -> e8bdcdeab


[SPARK-6885] [ML] decision tree support predict class probabilities

Decision tree support predict class probabilities.
Implement the prediction probabilities function referred the old DecisionTree 
API and the [sklean 
API](https://github.com/scikit-learn/scikit-learn/blob/master/sklearn/tree/tree.py#L593).
I make the DecisionTreeClassificationModel inherit from 
ProbabilisticClassificationModel, make the predictRaw to return the raw counts 
vector and make raw2probabilityInPlace/predictProbability return the 
probabilities for each prediction.

Author: Yanbo Liang 

Closes #7694 from yanboliang/spark-6885 and squashes the following commits:

08d5b7f [Yanbo Liang] fix ImpurityStats null parameters and 
raw2probabilityInPlace sum = 0 issue
2174278 [Yanbo Liang] solve merge conflicts
7e90ba8 [Yanbo Liang] fix typos
33ae183 [Yanbo Liang] fix annotation
ff043d3 [Yanbo Liang] raw2probabilityInPlace should operate in-place
c32d6ce [Yanbo Liang] optimize calculateImpurityStats function again
6167fb0 [Yanbo Liang] optimize calculateImpurityStats function
fbbe2ec [Yanbo Liang] eliminate duplicated struct and code
beb1634 [Yanbo Liang] try to eliminate impurityStats for each LearningNode
99e8943 [Yanbo Liang] code optimization
5ec3323 [Yanbo Liang] implement InformationGainAndImpurityStats
227c91b [Yanbo Liang] refactor LearningNode to store ImpurityCalculator
d746ffc [Yanbo Liang] decision tree support predict class probabilities


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8bdcdea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8bdcdea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8bdcdea

Branch: refs/heads/master
Commit: e8bdcdeabb2df139a656f86686cdb53c891b1f4b
Parents: 4011a94
Author: Yanbo Liang 
Authored: Fri Jul 31 11:56:52 2015 -0700
Committer: Joseph K. Bradley 
Committed: Fri Jul 31 11:56:52 2015 -0700

--
 .../classification/DecisionTreeClassifier.scala |  40 --
 .../spark/ml/classification/GBTClassifier.scala |   2 +-
 .../classification/RandomForestClassifier.scala |   2 +-
 .../ml/regression/DecisionTreeRegressor.scala   |   2 +-
 .../spark/ml/regression/GBTRegressor.scala  |   2 +-
 .../ml/regression/RandomForestRegressor.scala   |   2 +-
 .../scala/org/apache/spark/ml/tree/Node.scala   |  80 ++--
 .../spark/ml/tree/impl/RandomForest.scala   | 126 ---
 .../spark/mllib/tree/impurity/Entropy.scala |   2 +-
 .../apache/spark/mllib/tree/impurity/Gini.scala |   2 +-
 .../spark/mllib/tree/impurity/Impurity.scala|   2 +-
 .../spark/mllib/tree/impurity/Variance.scala|   2 +-
 .../mllib/tree/model/InformationGainStats.scala |  61 -
 .../DecisionTreeClassifierSuite.scala   |  30 -
 .../ml/classification/GBTClassifierSuite.scala  |   2 +-
 .../RandomForestClassifierSuite.scala   |   2 +-
 16 files changed, 229 insertions(+), 130 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e8bdcdea/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
index 36fe1bd..f27cfd0 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
@@ -18,12 +18,11 @@
 package org.apache.spark.ml.classification
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.ml.{PredictionModel, Predictor}
 import org.apache.spark.ml.param.ParamMap
 import org.apache.spark.ml.tree.{DecisionTreeModel, DecisionTreeParams, Node, 
TreeClassifierParams}
 import org.apache.spark.ml.tree.impl.RandomForest
 import org.apache.spark.ml.util.{Identifiable, MetadataUtils}
-import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, 
Vectors}
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => 
OldStrategy}
 import org.apache.spark.mllib.tree.model.{DecisionTreeModel => 
OldDecisionTreeModel}
@@ -39,7 +38,7 @@ import org.apache.spark.sql.DataFrame
  */
 @Experimental
 final class DecisionTreeClassifier(override val uid: String)
-  extends Predictor[Vector, DecisionTreeClassifier, 
DecisionTreeClassificationModel]
+  extends ProbabilisticClassifier[Vector, DecisionTreeClassifier, 
DecisionTreeClassificationModel]
   with DecisionTreeParams with TreeClassifierParams {
 
   def this() = this

spark git commit: [SPARK-9231] [MLLIB] DistributedLDAModel method for top topics per document

2015-07-31 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 6add4eddb -> 4011a9471


[SPARK-9231] [MLLIB] DistributedLDAModel method for top topics per document

jira: https://issues.apache.org/jira/browse/SPARK-9231

Helper method in DistributedLDAModel of this form:
```
/**
 * For each document, return the top k weighted topics for that document.
 * return RDD of (doc ID, topic indices, topic weights)
 */
def topTopicsPerDocument(k: Int): RDD[(Long, Array[Int], Array[Double])]
```

Author: Yuhao Yang 

Closes #7785 from hhbyyh/topTopicsPerdoc and squashes the following commits:

30ad153 [Yuhao Yang] small fix
fd24580 [Yuhao Yang] add topTopics per document to DistributedLDAModel


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4011a947
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4011a947
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4011a947

Branch: refs/heads/master
Commit: 4011a947154d97a9ffb5a71f077481a12534d36b
Parents: 6add4ed
Author: Yuhao Yang 
Authored: Fri Jul 31 11:50:15 2015 -0700
Committer: Joseph K. Bradley 
Committed: Fri Jul 31 11:50:15 2015 -0700

--
 .../apache/spark/mllib/clustering/LDAModel.scala | 19 ++-
 .../apache/spark/mllib/clustering/LDASuite.scala | 13 -
 2 files changed, 30 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4011a947/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index 6cfad3f..82281a0 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.mllib.clustering
 
-import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, normalize, sum}
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argtopk, 
normalize, sum}
 import breeze.numerics.{exp, lgamma}
 import org.apache.hadoop.fs.Path
 import org.json4s.DefaultFormats
@@ -591,6 +591,23 @@ class DistributedLDAModel private[clustering] (
 JavaPairRDD.fromRDD(topicDistributions.asInstanceOf[RDD[(java.lang.Long, 
Vector)]])
   }
 
+  /**
+   * For each document, return the top k weighted topics for that document and 
their weights.
+   * @return RDD of (doc ID, topic indices, topic weights)
+   */
+  def topTopicsPerDocument(k: Int): RDD[(Long, Array[Int], Array[Double])] = {
+graph.vertices.filter(LDA.isDocumentVertex).map { case (docID, 
topicCounts) =>
+  val topIndices = argtopk(topicCounts, k)
+  val sumCounts = sum(topicCounts)
+  val weights = if (sumCounts != 0) {
+topicCounts(topIndices) / sumCounts
+  } else {
+topicCounts(topIndices)
+  }
+  (docID.toLong, topIndices.toArray, weights.toArray)
+}
+  }
+
   // TODO:
   // override def topicDistributions(documents: RDD[(Long, Vector)]): 
RDD[(Long, Vector)] = ???
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4011a947/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
index c43e1e5..695ee3b 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.mllib.clustering
 
-import breeze.linalg.{DenseMatrix => BDM, max, argmax}
+import breeze.linalg.{DenseMatrix => BDM, argtopk, max, argmax}
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.graphx.Edge
@@ -108,6 +108,17 @@ class LDASuite extends SparkFunSuite with 
MLlibTestSparkContext {
   assert(topicDistribution.toArray.sum ~== 1.0 absTol 1e-5)
 }
 
+val top2TopicsPerDoc = model.topTopicsPerDocument(2).map(t => (t._1, 
(t._2, t._3)))
+model.topicDistributions.join(top2TopicsPerDoc).collect().foreach {
+  case (docId, (topicDistribution, (indices, weights))) =>
+assert(indices.length == 2)
+assert(weights.length == 2)
+val bdvTopicDist = topicDistribution.toBreeze
+val top2Indices = argtopk(bdvTopicDist, 2)
+assert(top2Indices.toArray === indices)
+assert(bdvTopicDist(top2Indices).toArray === weights)
+}
+
 // Check: log probabilities
 assert(model.logLikelihood < 0.0)
 assert(model.logPrior < 0.0)


-
To unsubscribe, 

spark git commit: [SPARK-9471] [ML] Multilayer Perceptron

2015-07-31 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 0024da915 -> 6add4eddb


[SPARK-9471] [ML] Multilayer Perceptron

This pull request contains the following feature for ML:
   - Multilayer Perceptron classifier

This implementation is based on our initial pull request with bgreeven: 
https://github.com/apache/spark/pull/1290 and inspired by very insightful 
suggestions from mengxr and witgo (I would like to thank all other people from 
the mentioned thread for useful discussions). The original code was extensively 
tested and benchmarked. Since then, I've addressed two main requirements that 
prevented the code from merging into the main branch:
   - Extensible interface, so it will be easy to implement new types of networks
 - Main building blocks are traits `Layer` and `LayerModel`. They are used 
for constructing layers of ANN. New layers can be added by extending the 
`Layer` and `LayerModel` traits. These traits are private in this release in 
order to save path to improve them based on community feedback
 - Back propagation is implemented in general form, so there is no need to 
change it (optimization algorithm) when new layers are implemented
   - Speed and scalability: this implementation has to be comparable in terms 
of speed to the state of the art single node implementations.
 - The developed benchmark for large ANN shows that the proposed code is on 
par with C++ CPU implementation and scales nicely with the number of workers. 
Details can be found here: https://github.com/avulanov/ann-benchmark

   - DBN and RBM by witgo 
https://github.com/witgo/spark/tree/ann-interface-gemm-dbn
   - Dropout https://github.com/avulanov/spark/tree/ann-interface-gemm

mengxr and dbtsai kindly agreed to perform code review.

Author: Alexander Ulanov 
Author: Bert Greevenbosch 

Closes #7621 from avulanov/SPARK-2352-ann and squashes the following commits:

4806b6f [Alexander Ulanov] Addressing reviewers comments.
a7e7951 [Alexander Ulanov] Default blockSize: 100. Added documentation to 
blockSize parameter and DataStacker class
f69bb3d [Alexander Ulanov] Addressing reviewers comments.
374bea6 [Alexander Ulanov] Moving ANN to ML package. GradientDescent 
constructor is now spark private.
43b0ae2 [Alexander Ulanov] Addressing reviewers comments. Adding multiclass 
test.
9d18469 [Alexander Ulanov] Addressing reviewers comments: unnecessary copy of 
data in predict
35125ab [Alexander Ulanov] Style fix in tests
e191301 [Alexander Ulanov] Apache header
a226133 [Alexander Ulanov] Multilayer Perceptron regressor and classifier


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6add4edd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6add4edd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6add4edd

Branch: refs/heads/master
Commit: 6add4eddb39e7748a87da3e921ea3c7881d30a82
Parents: 0024da9
Author: Alexander Ulanov 
Authored: Fri Jul 31 11:22:40 2015 -0700
Committer: Xiangrui Meng 
Committed: Fri Jul 31 11:23:30 2015 -0700

--
 .../org/apache/spark/ml/ann/BreezeUtil.scala|  63 ++
 .../scala/org/apache/spark/ml/ann/Layer.scala   | 882 +++
 .../MultilayerPerceptronClassifier.scala| 193 
 .../org/apache/spark/ml/param/params.scala  |   5 +
 .../mllib/optimization/GradientDescent.scala|   2 +-
 .../org/apache/spark/ml/ann/ANNSuite.scala  |  91 ++
 .../MultilayerPerceptronClassifierSuite.scala   |  91 ++
 7 files changed, 1326 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6add4edd/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala 
b/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala
new file mode 100644
index 000..7429f9d
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apach

spark git commit: [SQL] address comments for to_date/trunc

2015-07-31 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 27ae851ce -> 0024da915


[SQL] address comments for to_date/trunc

This PR address the comments in #7805

cc rxin

Author: Davies Liu 

Closes #7817 from davies/trunc and squashes the following commits:

f729d5f [Davies Liu] rollback
cb7f7832 [Davies Liu] genCode() is protected
31e52ef [Davies Liu] fix style
ed1edc7 [Davies Liu] address comments for #7805


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0024da91
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0024da91
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0024da91

Branch: refs/heads/master
Commit: 0024da9157ba12ec84883a78441fa6835c1d0042
Parents: 27ae851
Author: Davies Liu 
Authored: Fri Jul 31 11:07:34 2015 -0700
Committer: Davies Liu 
Committed: Fri Jul 31 11:07:34 2015 -0700

--
 .../sql/catalyst/expressions/datetimeFunctions.scala | 15 ---
 .../spark/sql/catalyst/util/DateTimeUtils.scala  |  3 ++-
 .../catalyst/expressions/ExpressionEvalHelper.scala  |  4 +---
 .../main/scala/org/apache/spark/sql/functions.scala  |  3 +++
 4 files changed, 14 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0024da91/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
index 6e76133..07dea5b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
@@ -726,15 +726,16 @@ case class TruncDate(date: Expression, format: Expression)
   override def dataType: DataType = DateType
   override def prettyName: String = "trunc"
 
-  lazy val minItemConst = 
DateTimeUtils.parseTruncLevel(format.eval().asInstanceOf[UTF8String])
+  private lazy val truncLevel: Int =
+DateTimeUtils.parseTruncLevel(format.eval().asInstanceOf[UTF8String])
 
   override def eval(input: InternalRow): Any = {
-val minItem = if (format.foldable) {
-  minItemConst
+val level = if (format.foldable) {
+  truncLevel
 } else {
   DateTimeUtils.parseTruncLevel(format.eval().asInstanceOf[UTF8String])
 }
-if (minItem == -1) {
+if (level == -1) {
   // unknown format
   null
 } else {
@@ -742,7 +743,7 @@ case class TruncDate(date: Expression, format: Expression)
   if (d == null) {
 null
   } else {
-DateTimeUtils.truncDate(d.asInstanceOf[Int], minItem)
+DateTimeUtils.truncDate(d.asInstanceOf[Int], level)
   }
 }
   }
@@ -751,7 +752,7 @@ case class TruncDate(date: Expression, format: Expression)
 val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
 
 if (format.foldable) {
-  if (minItemConst == -1) {
+  if (truncLevel == -1) {
 s"""
   boolean ${ev.isNull} = true;
   ${ctx.javaType(dataType)} ${ev.primitive} = 
${ctx.defaultValue(dataType)};
@@ -763,7 +764,7 @@ case class TruncDate(date: Expression, format: Expression)
   boolean ${ev.isNull} = ${d.isNull};
   ${ctx.javaType(dataType)} ${ev.primitive} = 
${ctx.defaultValue(dataType)};
   if (!${ev.isNull}) {
-${ev.primitive} = $dtu.truncDate(${d.primitive}, $minItemConst);
+${ev.primitive} = $dtu.truncDate(${d.primitive}, $truncLevel);
   }
 """
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0024da91/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 5a7c25b..032ed8a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -794,7 +794,8 @@ object DateTimeUtils {
 } else if (level == TRUNC_TO_MONTH) {
   d - DateTimeUtils.getDayOfMonth(d) + 1
 } else {
-  throw new Exception(s"Invalid trunc level: $level")
+  // caller make sure that this should never be reached
+  sys.error(s"Invalid trunc level: $level")
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0024da91/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
---

spark git commit: [SPARK-9446] Clear Active SparkContext in stop() method

2015-07-31 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 04a49edfd -> 27ae851ce


[SPARK-9446] Clear Active SparkContext in stop() method

In thread 'stopped SparkContext remaining active' on mailing list, Andres 
observed the following in driver log:
```
15/07/29 15:17:09 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: 
ApplicationMaster has disassociated: 
15/07/29 15:17:09 INFO YarnClientSchedulerBackend: Shutting down all executors
Exception in thread "Yarn application state monitor" 
org.apache.spark.SparkException: Error asking standalone scheduler to shut down 
executors
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:261)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:266)
at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.stop(YarnClientSchedulerBackend.scala:158)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:416)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1411)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1644)
at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:139)
Caused by: java.lang.InterruptedException
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)15/07/29 15:17:09 
INFO YarnClientSchedulerBackend: Asking each executor to shut down

at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:257)
... 6 more
```
Effect of the above exception is that a stopped SparkContext is returned to 
user since SparkContext.clearActiveContext() is not called.

Author: tedyu 

Closes #7756 from tedyu/master and squashes the following commits:

7339ff2 [tedyu] Move null assignment out of tryLogNonFatalError block
6e02cd9 [tedyu] Use Utils.tryLogNonFatalError to guard resource release
f5fb519 [tedyu] Clear Active SparkContext in stop() method using finally


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27ae851c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27ae851c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27ae851c

Branch: refs/heads/master
Commit: 27ae851ce16082775ffbcb5b8fc6bdbe65dc70fc
Parents: 04a49ed
Author: tedyu 
Authored: Fri Jul 31 18:16:55 2015 +0100
Committer: Sean Owen 
Committed: Fri Jul 31 18:16:55 2015 +0100

--
 .../scala/org/apache/spark/SparkContext.scala   | 50 +++-
 1 file changed, 37 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/27ae851c/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ac6ac6c..2d8aa25 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1689,33 +1689,57 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   Utils.removeShutdownHook(_shutdownHookRef)
 }
 
-postApplicationEnd()
-_ui.foreach(_.stop())
+Utils.tryLogNonFatalError {
+  postApplicationEnd()
+}
+Utils.tryLogNonFatalError {
+  _ui.foreach(_.stop())
+}
 if (env != null) {
-  env.metricsSystem.report()
+  Utils.tryLogNonFatalError {
+env.metricsSystem.report()
+  }
 }
 if (metadataCleaner != null) {
-  metadataCleaner.cancel()
+  Utils.tryLogNonFatalError {
+metadataCleaner.cancel()
+  }
+}
+Utils.tryLogNonFatalError {
+  _cleaner.foreach(_.stop())
+}
+Utils.tryLogNonFatalError {
+  _executorAllocationManager.foreach(_.stop())
 }
-_cleaner.foreach(_.stop())
-_executorAllocationManager.foreach(_.stop())
 if (_dagScheduler != null) {
-  _dagScheduler.stop()

spark git commit: [SPARK-9446] Clear Active SparkContext in stop() method

2015-07-31 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 3d6a9214e -> 5ad9f950c


[SPARK-9446] Clear Active SparkContext in stop() method

In thread 'stopped SparkContext remaining active' on mailing list, Andres 
observed the following in driver log:
```
15/07/29 15:17:09 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: 
ApplicationMaster has disassociated: 
15/07/29 15:17:09 INFO YarnClientSchedulerBackend: Shutting down all executors
Exception in thread "Yarn application state monitor" 
org.apache.spark.SparkException: Error asking standalone scheduler to shut down 
executors
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:261)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:266)
at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.stop(YarnClientSchedulerBackend.scala:158)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:416)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1411)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1644)
at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:139)
Caused by: java.lang.InterruptedException
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)15/07/29 15:17:09 
INFO YarnClientSchedulerBackend: Asking each executor to shut down

at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:257)
... 6 more
```
Effect of the above exception is that a stopped SparkContext is returned to 
user since SparkContext.clearActiveContext() is not called.

Author: tedyu 

Closes #7756 from tedyu/master and squashes the following commits:

7339ff2 [tedyu] Move null assignment out of tryLogNonFatalError block
6e02cd9 [tedyu] Use Utils.tryLogNonFatalError to guard resource release
f5fb519 [tedyu] Clear Active SparkContext in stop() method using finally

(cherry picked from commit 27ae851ce16082775ffbcb5b8fc6bdbe65dc70fc)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ad9f950
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ad9f950
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ad9f950

Branch: refs/heads/branch-1.4
Commit: 5ad9f950c4bd0042d79cdccb5277c10f8412be85
Parents: 3d6a921
Author: tedyu 
Authored: Fri Jul 31 18:16:55 2015 +0100
Committer: Sean Owen 
Committed: Fri Jul 31 18:17:05 2015 +0100

--
 .../scala/org/apache/spark/SparkContext.scala   | 50 +++-
 1 file changed, 37 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5ad9f950/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d499aba..3caae87 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1630,33 +1630,57 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   Utils.removeShutdownHook(_shutdownHookRef)
 }
 
-postApplicationEnd()
-_ui.foreach(_.stop())
+Utils.tryLogNonFatalError {
+  postApplicationEnd()
+}
+Utils.tryLogNonFatalError {
+  _ui.foreach(_.stop())
+}
 if (env != null) {
-  env.metricsSystem.report()
+  Utils.tryLogNonFatalError {
+env.metricsSystem.report()
+  }
 }
 if (metadataCleaner != null) {
-  metadataCleaner.cancel()
+  Utils.tryLogNonFatalError {
+metadataCleaner.cancel()
+  }
+}
+Utils.tryLogNonFatalError {
+  _cleaner.foreach(_.stop())
+}
+Utils.tryLogNonFatalError {
+  _executorAllocationManager.foreach(_.stop())
 }
-_cleaner.foreach(_.stop())
-_ex

spark git commit: [SPARK-9497] [SPARK-9509] [CORE] Use ask instead of askWithRetry

2015-07-31 Thread kayousterhout
Repository: spark
Updated Branches:
  refs/heads/master fc0e57e5a -> 04a49edfd


[SPARK-9497] [SPARK-9509] [CORE] Use ask instead of askWithRetry

`RpcEndpointRef.askWithRetry` throws `SparkException` rather than 
`TimeoutException`. Use ask to replace it because we don't need to retry here.

Author: zsxwing 

Closes #7824 from zsxwing/SPARK-9497 and squashes the following commits:

7bfc2b4 [zsxwing] Use ask instead of askWithRetry


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04a49edf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04a49edf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04a49edf

Branch: refs/heads/master
Commit: 04a49edfdb606c01fa4f8ae6e730ec4f9bd0cb6d
Parents: fc0e57e
Author: zsxwing 
Authored: Fri Jul 31 09:34:10 2015 -0700
Committer: Kay Ousterhout 
Committed: Fri Jul 31 09:34:16 2015 -0700

--
 .../main/scala/org/apache/spark/deploy/client/AppClient.scala   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/04a49edf/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 79b251e..a659abf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -27,7 +27,7 @@ import org.apache.spark.deploy.{ApplicationDescription, 
ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.rpc._
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils}
 
 /**
  * Interface allowing applications to speak with a Spark deploy cluster. Takes 
a master URL,
@@ -248,7 +248,8 @@ private[spark] class AppClient(
   def stop() {
 if (endpoint != null) {
   try {
-endpoint.askWithRetry[Boolean](StopAppClient)
+val timeout = RpcUtils.askRpcTimeout(conf)
+timeout.awaitResult(endpoint.ask[Boolean](StopAppClient))
   } catch {
 case e: TimeoutException =>
   logInfo("Stop request to Master timed out; it may already be shut 
down.")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9053] [SPARKR] Fix spaces around parens, infix operators etc.

2015-07-31 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master 6bba7509a -> fc0e57e5a


[SPARK-9053] [SPARKR] Fix spaces around parens, infix operators etc.

### JIRA
[[SPARK-9053] Fix spaces around parens, infix operators etc. - ASF 
JIRA](https://issues.apache.org/jira/browse/SPARK-9053)

### The Result of `lint-r`
[The result of lint-r at the 
rivision:a4c83cb1e4b066cd60264b6572fd3e51d160d26a](https://gist.github.com/yu-iskw/d253d7f8ef351f86443d)

Author: Yu ISHIKAWA 

Closes #7584 from yu-iskw/SPARK-9053 and squashes the following commits:

613170f [Yu ISHIKAWA] Ignore a warning about a space before a left parentheses
ede61e1 [Yu ISHIKAWA] Ignores two warnings about a space before a left 
parentheses. TODO: After updating `lintr`, we will remove the ignores
de3e0db [Yu ISHIKAWA] Add '## nolint start' & '## nolint end' statement to 
ignore infix space warnings
e233ea8 [Yu ISHIKAWA] [SPARK-9053][SparkR] Fix spaces around parens, infix 
operators etc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc0e57e5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc0e57e5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc0e57e5

Branch: refs/heads/master
Commit: fc0e57e5aba82a3f227fef05a843283e2ec893fc
Parents: 6bba750
Author: Yu ISHIKAWA 
Authored: Fri Jul 31 09:33:38 2015 -0700
Committer: Shivaram Venkataraman 
Committed: Fri Jul 31 09:33:38 2015 -0700

--
 R/pkg/R/DataFrame.R | 4 
 R/pkg/R/RDD.R   | 7 +--
 R/pkg/R/column.R| 2 +-
 R/pkg/R/context.R   | 2 +-
 R/pkg/R/pairRDD.R   | 2 +-
 R/pkg/R/utils.R | 4 ++--
 R/pkg/inst/tests/test_binary_function.R | 2 +-
 R/pkg/inst/tests/test_rdd.R | 6 +++---
 R/pkg/inst/tests/test_sparkSQL.R| 4 +++-
 9 files changed, 21 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fc0e57e5/R/pkg/R/DataFrame.R
--
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index f4c93d3..b31ad37 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1322,9 +1322,11 @@ setMethod("write.df",
 "org.apache.spark.sql.parquet")
 }
 allModes <- c("append", "overwrite", "error", "ignore")
+# nolint start
 if (!(mode %in% allModes)) {
   stop('mode should be one of "append", "overwrite", "error", 
"ignore"')
 }
+# nolint end
 jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", 
"saveMode", mode)
 options <- varargsToEnv(...)
 if (!is.null(path)) {
@@ -1384,9 +1386,11 @@ setMethod("saveAsTable",
 "org.apache.spark.sql.parquet")
 }
 allModes <- c("append", "overwrite", "error", "ignore")
+# nolint start
 if (!(mode %in% allModes)) {
   stop('mode should be one of "append", "overwrite", "error", 
"ignore"')
 }
+# nolint end
 jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", 
"saveMode", mode)
 options <- varargsToEnv(...)
 callJMethod(df@sdf, "saveAsTable", tableName, source, jmode, 
options)

http://git-wip-us.apache.org/repos/asf/spark/blob/fc0e57e5/R/pkg/R/RDD.R
--
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index d2d0967..2a013b3 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -85,7 +85,9 @@ setMethod("initialize", "PipelinedRDD", function(.Object, 
prev, func, jrdd_val)
 
   isPipelinable <- function(rdd) {
 e <- rdd@env
+# nolint start
 !(e$isCached || e$isCheckpointed)
+# nolint end
   }
 
   if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
@@ -97,7 +99,8 @@ setMethod("initialize", "PipelinedRDD", function(.Object, 
prev, func, jrdd_val)
 # prev_serializedMode is used during the delayed computation of JRDD in 
getJRDD
   } else {
 pipelinedFunc <- function(partIndex, part) {
-  func(partIndex, prev@func(partIndex, part))
+  f <- prev@func
+  func(partIndex, f(partIndex, part))
 }
 .Object@func <- cleanClosure(pipelinedFunc)
 .Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
@@ -841,7 +844,7 @@ setMethod("sampleRDD",
 if (withReplacement) {
   count <- rpois(1, fraction)
   if (count > 0) {
-res[(len + 1):(len + count)] <- rep(list(elem), count)
+res[ (len + 1) : (len + count) ] <- rep(list(elem), count)
 len <- len + count
   }
 

spark git commit: [SPARK-9500] add TernaryExpression to simplify ternary expressions

2015-07-31 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master a3a85d73d -> 6bba7509a


[SPARK-9500] add TernaryExpression to simplify ternary expressions

There lots of duplicated code in ternary expressions, create a 
TernaryExpression for them to reduce duplicated code.

cc chenghao-intel

Author: Davies Liu 

Closes #7816 from davies/ternary and squashes the following commits:

ed2bf76 [Davies Liu] add TernaryExpression


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6bba7509
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6bba7509
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6bba7509

Branch: refs/heads/master
Commit: 6bba7509a932aa4d39266df2d15b1370b7aabbec
Parents: a3a85d7
Author: Davies Liu 
Authored: Fri Jul 31 08:28:05 2015 -0700
Committer: Davies Liu 
Committed: Fri Jul 31 08:28:05 2015 -0700

--
 .../sql/catalyst/expressions/Expression.scala   |  85 +
 .../expressions/codegen/CodeGenerator.scala |   2 +-
 .../spark/sql/catalyst/expressions/math.scala   |  66 +---
 .../catalyst/expressions/stringOperations.scala | 356 +--
 4 files changed, 183 insertions(+), 326 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6bba7509/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index 8fc1826..2842b3e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -432,3 +432,88 @@ abstract class BinaryOperator extends BinaryExpression 
with ExpectsInputTypes {
 private[sql] object BinaryOperator {
   def unapply(e: BinaryOperator): Option[(Expression, Expression)] = 
Some((e.left, e.right))
 }
+
+/**
+ * An expression with three inputs and one output. The output is by default 
evaluated to null
+ * if any input is evaluated to null.
+ */
+abstract class TernaryExpression extends Expression {
+
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  /**
+   * Default behavior of evaluation according to the default nullability of 
BinaryExpression.
+   * If subclass of BinaryExpression override nullable, probably should also 
override this.
+   */
+  override def eval(input: InternalRow): Any = {
+val exprs = children
+val value1 = exprs(0).eval(input)
+if (value1 != null) {
+  val value2 = exprs(1).eval(input)
+  if (value2 != null) {
+val value3 = exprs(2).eval(input)
+if (value3 != null) {
+  return nullSafeEval(value1, value2, value3)
+}
+  }
+}
+null
+  }
+
+  /**
+   * Called by default [[eval]] implementation.  If subclass of 
BinaryExpression keep the default
+   * nullability, they can override this method to save null-check code.  If 
we need full control
+   * of evaluation process, we should override [[eval]].
+   */
+  protected def nullSafeEval(input1: Any, input2: Any, input3: Any): Any =
+sys.error(s"BinaryExpressions must override either eval or nullSafeEval")
+
+  /**
+   * Short hand for generating binary evaluation code.
+   * If either of the sub-expressions is null, the result of this computation
+   * is assumed to be null.
+   *
+   * @param f accepts two variable names and returns Java code to compute the 
output.
+   */
+  protected def defineCodeGen(
+ctx: CodeGenContext,
+ev: GeneratedExpressionCode,
+f: (String, String, String) => String): String = {
+nullSafeCodeGen(ctx, ev, (eval1, eval2, eval3) => {
+  s"${ev.primitive} = ${f(eval1, eval2, eval3)};"
+})
+  }
+
+  /**
+   * Short hand for generating binary evaluation code.
+   * If either of the sub-expressions is null, the result of this computation
+   * is assumed to be null.
+   *
+   * @param f function that accepts the 2 non-null evaluation result names of 
children
+   *  and returns Java code to compute the output.
+   */
+  protected def nullSafeCodeGen(
+ctx: CodeGenContext,
+ev: GeneratedExpressionCode,
+f: (String, String, String) => String): String = {
+val evals = children.map(_.gen(ctx))
+val resultCode = f(evals(0).primitive, evals(1).primitive, 
evals(2).primitive)
+s"""
+  ${evals(0).code}
+  boolean ${ev.isNull} = true;
+  ${ctx.javaType(dataType)} ${ev.primitive} = 
${ctx.defaultValue(dataType)};
+  if (!${evals(0).isNull}) {
+${evals(1).code}
+if (!${evals(1).isNull}) {
+  ${evals(2)