spark git commit: [SPARK-9517][SQL] BytesToBytesMap should encode data the same way as UnsafeExternalSorter
Repository: spark Updated Branches: refs/heads/master 67ad4e21f -> d90f2cf7a [SPARK-9517][SQL] BytesToBytesMap should encode data the same way as UnsafeExternalSorter BytesToBytesMap current encodes key/value data in the following format: ``` 8B key length, key data, 8B value length, value data ``` UnsafeExternalSorter, on the other hand, encodes data this way: ``` 4B record length, data ``` As a result, we cannot pass records encoded by BytesToBytesMap directly into UnsafeExternalSorter for sorting. However, if we rearrange data slightly, we can then pass the key/value records directly into UnsafeExternalSorter: ``` 4B key+value length, 4B key length, key data, value data ``` Author: Reynold Xin Closes #7845 from rxin/kvsort-rebase and squashes the following commits: 5716b59 [Reynold Xin] Fixed test. 2e62ccb [Reynold Xin] Updated BytesToBytesMap's data encoding to put the key first. a51b641 [Reynold Xin] Added a KV sorter interface. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d90f2cf7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d90f2cf7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d90f2cf7 Branch: refs/heads/master Commit: d90f2cf7a2a1d1e69f9ab385f35f62d4091b5302 Parents: 67ad4e2 Author: Reynold Xin Authored: Fri Jul 31 23:55:16 2015 -0700 Committer: Reynold Xin Committed: Fri Jul 31 23:55:16 2015 -0700 -- .../spark/unsafe/map/BytesToBytesMap.java | 58 .../unsafe/sort/UnsafeExternalSorter.java | 15 .../map/AbstractBytesToBytesMapSuite.java | 6 +- .../sql/execution/UnsafeKeyValueSorter.java | 30 .../UnsafeFixedWidthAggregationMap.java | 73 ++-- .../sql/execution/GeneratedAggregate.scala | 27 +--- .../UnsafeFixedWidthAggregationMapSuite.scala | 27 .../org/apache/spark/unsafe/KVIterator.java | 29 8 files changed, 175 insertions(+), 90 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d90f2cf7/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java -- diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 0f42950..481375f 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -17,7 +17,6 @@ package org.apache.spark.unsafe.map; -import java.io.IOException; import java.lang.Override; import java.lang.UnsupportedOperationException; import java.util.Iterator; @@ -212,7 +211,7 @@ public final class BytesToBytesMap { */ public int numElements() { return numElements; } - private static final class BytesToBytesMapIterator implements Iterator { + public static final class BytesToBytesMapIterator implements Iterator { private final int numRecords; private final Iterator dataPagesIterator; @@ -222,7 +221,8 @@ public final class BytesToBytesMap { private Object pageBaseObject; private long offsetInPage; -BytesToBytesMapIterator(int numRecords, Iterator dataPagesIterator, Location loc) { +private BytesToBytesMapIterator( +int numRecords, Iterator dataPagesIterator, Location loc) { this.numRecords = numRecords; this.dataPagesIterator = dataPagesIterator; this.loc = loc; @@ -244,13 +244,13 @@ public final class BytesToBytesMap { @Override public Location next() { - int keyLength = (int) PlatformDependent.UNSAFE.getLong(pageBaseObject, offsetInPage); - if (keyLength == END_OF_PAGE_MARKER) { + int totalLength = PlatformDependent.UNSAFE.getInt(pageBaseObject, offsetInPage); + if (totalLength == END_OF_PAGE_MARKER) { advanceToNextPage(); -keyLength = (int) PlatformDependent.UNSAFE.getLong(pageBaseObject, offsetInPage); +totalLength = PlatformDependent.UNSAFE.getInt(pageBaseObject, offsetInPage); } loc.with(pageBaseObject, offsetInPage); - offsetInPage += 8 + 8 + keyLength + loc.getValueLength(); + offsetInPage += 8 + totalLength; currentRecordNumber++; return loc; } @@ -269,7 +269,7 @@ public final class BytesToBytesMap { * If any other lookups or operations are performed on this map while iterating over it, including * `lookup()`, the behavior of the returned iterator is undefined. */ - public Iterator iterator() { + public BytesToBytesMapIterator iterator() { return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc); } @@ -352,15 +352,18 @@ public final class BytesToBytesMap { taskMemoryManager.getOffsetInPage(fullKeyAddress
spark git commit: [SPARK-8232] [SQL] Add sort_array support
Repository: spark Updated Branches: refs/heads/master 3320b0ba2 -> 67ad4e21f [SPARK-8232] [SQL] Add sort_array support Add expression `sort_array` support. Author: Cheng Hao This patch had conflicts when merged, resolved by Committer: Davies Liu Closes #7581 from chenghao-intel/sort_array and squashes the following commits: 664c960 [Cheng Hao] update the sort_array by using the ArrayData 276d2d5 [Cheng Hao] add empty line 0edab9c [Cheng Hao] Add asending/descending support for sort_array 80fc0f8 [Cheng Hao] Add type checking a42b678 [Cheng Hao] Add sort_array support Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67ad4e21 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67ad4e21 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67ad4e21 Branch: refs/heads/master Commit: 67ad4e21fc68336b0ad6f9a363fb5ebb51f592bf Parents: 3320b0b Author: Cheng Hao Authored: Fri Jul 31 23:11:22 2015 -0700 Committer: Davies Liu Committed: Fri Jul 31 23:11:22 2015 -0700 -- python/pyspark/sql/functions.py | 20 + .../catalyst/analysis/FunctionRegistry.scala| 1 + .../expressions/collectionOperations.scala | 80 +++- .../expressions/CollectionFunctionsSuite.scala | 22 ++ .../scala/org/apache/spark/sql/functions.scala | 19 - .../spark/sql/DataFrameFunctionsSuite.scala | 51 - 6 files changed, 186 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/67ad4e21/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 89a2a5c..fb542e6 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -51,6 +51,7 @@ __all__ = [ 'sha1', 'sha2', 'size', +'sort_array', 'sparkPartitionId', 'struct', 'udf', @@ -570,8 +571,10 @@ def length(col): def format_number(col, d): """Formats the number X to a format like '#,###,###.##', rounded to d decimal places, and returns the result as a string. + :param col: the column name of the numeric value to be formatted :param d: the N decimal places + >>> sqlContext.createDataFrame([(5,)], ['a']).select(format_number('a', 4).alias('v')).collect() [Row(v=u'5.')] """ @@ -968,6 +971,23 @@ def soundex(col): return Column(sc._jvm.functions.size(_to_java_column(col))) +@since(1.5) +def sort_array(col, asc=True): +""" +Collection function: sorts the input array for the given column in ascending order. + +:param col: name of column or expression + +>>> df = sqlContext.createDataFrame([([2, 1, 3],),([1],),([],)], ['data']) +>>> df.select(sort_array(df.data).alias('r')).collect() +[Row(r=[1, 2, 3]), Row(r=[1]), Row(r=[])] +>>> df.select(sort_array(df.data, asc=False).alias('r')).collect() +[Row(r=[3, 2, 1]), Row(r=[1]), Row(r=[])] + """ +sc = SparkContext._active_spark_context +return Column(sc._jvm.functions.sort_array(_to_java_column(col), asc)) + + class UserDefinedFunction(object): """ User defined function in Python http://git-wip-us.apache.org/repos/asf/spark/blob/67ad4e21/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index ee44cbc..6e14451 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -233,6 +233,7 @@ object FunctionRegistry { // collection functions expression[Size]("size"), +expression[SortArray]("sort_array"), // misc functions expression[Crc32]("crc32"), http://git-wip-us.apache.org/repos/asf/spark/blob/67ad4e21/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 1a00dbc..0a53059 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -16,7 +16,10 @@ */ package org.apache.spark.sql.catalyst.expres
spark git commit: [SPARK-9415][SQL] Throw AnalysisException when using MapType on Join and Aggregate
Repository: spark Updated Branches: refs/heads/master 14f263448 -> 3320b0ba2 [SPARK-9415][SQL] Throw AnalysisException when using MapType on Join and Aggregate JIRA: https://issues.apache.org/jira/browse/SPARK-9415 Following up #7787. We shouldn't use MapType as grouping keys and join keys too. Author: Liang-Chi Hsieh Closes #7819 from viirya/map_join_groupby and squashes the following commits: 005ee0c [Liang-Chi Hsieh] For comments. 7463398 [Liang-Chi Hsieh] MapType can't be used as join keys, grouping keys. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3320b0ba Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3320b0ba Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3320b0ba Branch: refs/heads/master Commit: 3320b0ba262159c0c7209ce39b353c93c597077d Parents: 14f2634 Author: Liang-Chi Hsieh Authored: Fri Jul 31 22:26:30 2015 -0700 Committer: Reynold Xin Committed: Fri Jul 31 22:26:30 2015 -0700 -- .../sql/catalyst/analysis/CheckAnalysis.scala | 16 +++-- .../catalyst/analysis/AnalysisErrorSuite.scala | 68 +++- .../spark/sql/DataFrameAggregateSuite.scala | 10 --- .../scala/org/apache/spark/sql/JoinSuite.scala | 8 --- 4 files changed, 77 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3320b0ba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 0ebc3d1..364569d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -92,8 +92,11 @@ trait CheckAnalysis { case p: Predicate => p.asInstanceOf[Expression].children.foreach(checkValidJoinConditionExprs) case e if e.dataType.isInstanceOf[BinaryType] => -failAnalysis(s"expression ${e.prettyString} in join condition " + - s"'${condition.prettyString}' can't be binary type.") +failAnalysis(s"binary type expression ${e.prettyString} cannot be used " + + "in join conditions") + case e if e.dataType.isInstanceOf[MapType] => +failAnalysis(s"map type expression ${e.prettyString} cannot be used " + + "in join conditions") case _ => // OK } @@ -114,13 +117,16 @@ trait CheckAnalysis { def checkValidGroupingExprs(expr: Expression): Unit = expr.dataType match { case BinaryType => -failAnalysis(s"grouping expression '${expr.prettyString}' in aggregate can " + - s"not be binary type.") +failAnalysis(s"binary type expression ${expr.prettyString} cannot be used " + + "in grouping expression") + case m: MapType => +failAnalysis(s"map type expression ${expr.prettyString} cannot be used " + + "in grouping expression") case _ => // OK } aggregateExprs.foreach(checkValidAggregateExpression) -aggregateExprs.foreach(checkValidGroupingExprs) +groupingExprs.foreach(checkValidGroupingExprs) case Sort(orders, _, _) => orders.foreach { order => http://git-wip-us.apache.org/repos/asf/spark/blob/3320b0ba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 2588df9..aa19cdc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -181,7 +181,71 @@ class AnalysisErrorSuite extends SparkFunSuite with BeforeAndAfter { val error = intercept[AnalysisException] { SimpleAnalyzer.checkAnalysis(join) } -error.message.contains("Failure when resolving conflicting references in Join") -error.message.contains("Conflicting attributes") +assert(error.message.contains("Failure when resolving conflicting references in Join")) +assert(error.message.contains("Conflicting attributes")) + } + + test("aggregation can't work on binary and map types") { +va
spark git commit: [SPARK-9464][SQL] Property checks for UTF8String
Repository: spark Updated Branches: refs/heads/master 6996bd2e8 -> 14f263448 [SPARK-9464][SQL] Property checks for UTF8String This PR is based on the original work by JoshRosen in #7780, which adds ScalaCheck property-based tests for UTF8String. Author: Josh Rosen Author: Yijie Shen Closes #7830 from yjshen/utf8-property-checks and squashes the following commits: 593da3a [Yijie Shen] resolve comments c0800e6 [Yijie Shen] Finish all todos in suite 52f51a0 [Josh Rosen] Add some more failing tests 49ed0697 [Josh Rosen] Rename suite 9209c64 [Josh Rosen] UTF8String Property Checks. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14f26344 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14f26344 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14f26344 Branch: refs/heads/master Commit: 14f263448471f182123fc84619559df90e7ae52c Parents: 6996bd2 Author: Josh Rosen Authored: Fri Jul 31 21:19:23 2015 -0700 Committer: Reynold Xin Committed: Fri Jul 31 21:19:23 2015 -0700 -- unsafe/pom.xml | 10 + .../apache/spark/unsafe/types/UTF8String.java | 19 +- .../spark/unsafe/types/UTF8StringSuite.java | 13 +- .../types/UTF8StringPropertyCheckSuite.scala| 249 +++ 4 files changed, 280 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/14f26344/unsafe/pom.xml -- diff --git a/unsafe/pom.xml b/unsafe/pom.xml index 33782c6..89475ee 100644 --- a/unsafe/pom.xml +++ b/unsafe/pom.xml @@ -70,6 +70,16 @@ mockito-core test + + org.scalacheck + scalacheck_${scala.binary.version} + test + + + org.apache.commons + commons-lang3 + test + target/scala-${scala.binary.version}/classes http://git-wip-us.apache.org/repos/asf/spark/blob/14f26344/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java -- diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index 2561c1c..f6dafe9 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -301,10 +301,9 @@ public final class UTF8String implements Comparable, Serializable { int s = 0; int e = this.numBytes - 1; // skip all of the space (0x20) in the left side -while (s < this.numBytes && getByte(s) == 0x20) s++; +while (s < this.numBytes && getByte(s) <= 0x20 && getByte(s) >= 0x00) s++; // skip all of the space (0x20) in the right side -while (e >= 0 && getByte(e) == 0x20) e--; - +while (e >= 0 && getByte(e) <= 0x20 && getByte(e) >= 0x00) e--; if (s > e) { // empty string return UTF8String.fromBytes(new byte[0]); @@ -316,7 +315,7 @@ public final class UTF8String implements Comparable, Serializable { public UTF8String trimLeft() { int s = 0; // skip all of the space (0x20) in the left side -while (s < this.numBytes && getByte(s) == 0x20) s++; +while (s < this.numBytes && getByte(s) <= 0x20 && getByte(s) >= 0x00) s++; if (s == this.numBytes) { // empty string return UTF8String.fromBytes(new byte[0]); @@ -328,7 +327,7 @@ public final class UTF8String implements Comparable, Serializable { public UTF8String trimRight() { int e = numBytes - 1; // skip all of the space (0x20) in the right side -while (e >= 0 && getByte(e) == 0x20) e--; +while (e >= 0 && getByte(e) <= 0x20 && getByte(e) >= 0x00) e--; if (e < 0) { // empty string @@ -354,7 +353,7 @@ public final class UTF8String implements Comparable, Serializable { } public UTF8String repeat(int times) { -if (times <=0) { +if (times <= 0) { return EMPTY_UTF8; } @@ -492,7 +491,7 @@ public final class UTF8String implements Comparable, Serializable { */ public UTF8String rpad(int len, UTF8String pad) { int spaces = len - this.numChars(); // number of char need to pad -if (spaces <= 0) { +if (spaces <= 0 || pad.numBytes() == 0) { // no padding at all, return the substring of the current string return substring(0, len); } else { @@ -507,7 +506,7 @@ public final class UTF8String implements Comparable, Serializable { int idx = 0; while (idx < count) { copyMemory(pad.base, pad.offset, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes); -++idx; +++ idx; offset += pad.numBytes; } copyMemory(remain.base, remain.offset, data, BYTE_ARRAY_OFFSET + offset, remain.numB
spark git commit: [SPARK-8264][SQL]add substring_index function
Repository: spark Updated Branches: refs/heads/master 03377d252 -> 6996bd2e8 [SPARK-8264][SQL]add substring_index function This PR is based on #7533 , thanks to zhichao-li Closes #7533 Author: zhichao.li Author: Davies Liu Closes #7843 from davies/str_index and squashes the following commits: 391347b [Davies Liu] add python api 3ce7802 [Davies Liu] fix substringIndex f2d29a1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into str_index 515519b [zhichao.li] add foldable and remove null checking 9546991 [zhichao.li] scala style 67c253a [zhichao.li] hide some apis and clean code b19b013 [zhichao.li] add codegen and clean code ac863e9 [zhichao.li] reduce the calling of numChars 12e108f [zhichao.li] refine unittest d92951b [zhichao.li] add lastIndexOf 52d7b03 [zhichao.li] add substring_index function Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6996bd2e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6996bd2e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6996bd2e Branch: refs/heads/master Commit: 6996bd2e81bf6597dcda499d9a9a80927a43e30f Parents: 03377d2 Author: zhichao.li Authored: Fri Jul 31 21:18:01 2015 -0700 Committer: Reynold Xin Committed: Fri Jul 31 21:18:01 2015 -0700 -- python/pyspark/sql/functions.py | 19 + .../catalyst/analysis/FunctionRegistry.scala| 1 + .../catalyst/expressions/stringOperations.scala | 25 ++ .../expressions/StringExpressionsSuite.scala| 31 .../scala/org/apache/spark/sql/functions.scala | 12 ++- .../apache/spark/sql/StringFunctionsSuite.scala | 57 ++ .../apache/spark/unsafe/types/UTF8String.java | 80 +++- .../spark/unsafe/types/UTF8StringSuite.java | 38 ++ 8 files changed, 261 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6996bd2e/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index bb9926c..89a2a5c 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -921,6 +921,25 @@ def trunc(date, format): @since(1.5) +@ignore_unicode_prefix +def substring_index(str, delim, count): +""" +Returns the substring from string str before count occurrences of the delimiter delim. +If count is positive, everything the left of the final delimiter (counting from left) is +returned. If count is negative, every to the right of the final delimiter (counting from the +right) is returned. substring_index performs a case-sensitive match when searching for delim. + +>>> df = sqlContext.createDataFrame([('a.b.c.d',)], ['s']) +>>> df.select(substring_index(df.s, '.', 2).alias('s')).collect() +[Row(s=u'a.b')] +>>> df.select(substring_index(df.s, '.', -3).alias('s')).collect() +[Row(s=u'b.c.d')] +""" +sc = SparkContext._active_spark_context +return Column(sc._jvm.functions.substring_index(_to_java_column(str), delim, count)) + + +@since(1.5) def size(col): """ Collection function: returns the length of the array or map stored in the column. http://git-wip-us.apache.org/repos/asf/spark/blob/6996bd2e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 3f61a9a..ee44cbc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -199,6 +199,7 @@ object FunctionRegistry { expression[StringSplit]("split"), expression[Substring]("substr"), expression[Substring]("substring"), +expression[SubstringIndex]("substring_index"), expression[StringTrim]("trim"), expression[UnBase64]("unbase64"), expression[Upper]("ucase"), http://git-wip-us.apache.org/repos/asf/spark/blob/6996bd2e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala index 160e72f..5dd387a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/
spark git commit: [SPARK-9358][SQL] Code generation for UnsafeRow joiner.
Repository: spark Updated Branches: refs/heads/master 712f5b7a9 -> 03377d252 [SPARK-9358][SQL] Code generation for UnsafeRow joiner. This patch creates a code generated unsafe row concatenator that can be used to concatenate/join two UnsafeRows into a single UnsafeRow. Since it is inherently hard to test these low level stuff, the test suites employ randomized testing heavily in order to guarantee correctness. Author: Reynold Xin Closes #7821 from rxin/rowconcat and squashes the following commits: 8717f35 [Reynold Xin] Rebase and code review. 72c5d8e [Reynold Xin] Fixed a bug. a84ed2e [Reynold Xin] Fixed offset. 40c3fb2 [Reynold Xin] Reset random data generator. f0913aa [Reynold Xin] Test fixes. 6687b6f [Reynold Xin] Updated documentation. 00354b9 [Reynold Xin] Support concat data as well. e9a4347 [Reynold Xin] Updated. 6269f96 [Reynold Xin] Fixed a bug . 0f89716 [Reynold Xin] [SPARK-9358][SQL][WIP] Code generation for UnsafeRow concat. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03377d25 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03377d25 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03377d25 Branch: refs/heads/master Commit: 03377d2522776267a07b7d6ae9bddf79a4e0f516 Parents: 712f5b7 Author: Reynold Xin Authored: Fri Jul 31 21:09:00 2015 -0700 Committer: Reynold Xin Committed: Fri Jul 31 21:09:00 2015 -0700 -- .../sql/catalyst/expressions/UnsafeRow.java | 19 ++ .../expressions/codegen/CodeGenerator.scala | 2 + .../codegen/GenerateUnsafeProjection.scala | 6 +- .../codegen/GenerateUnsafeRowJoiner.scala | 241 +++ .../apache/spark/sql/RandomDataGenerator.scala | 15 +- .../GenerateUnsafeRowJoinerBitsetSuite.scala| 147 +++ .../codegen/GenerateUnsafeRowJoinerSuite.scala | 114 + .../UnsafeFixedWidthAggregationMap.java | 7 +- .../spark/sql/execution/TungstenSortSuite.scala | 3 + 9 files changed, 544 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/03377d25/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index e7088ed..24dc80b 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -85,6 +85,14 @@ public final class UnsafeRow extends MutableRow { }))); } + public static boolean isFixedLength(DataType dt) { +if (dt instanceof DecimalType) { + return ((DecimalType) dt).precision() < Decimal.MAX_LONG_DIGITS(); +} else { + return settableFieldTypes.contains(dt); +} + } + // // Private fields and methods // @@ -144,6 +152,17 @@ public final class UnsafeRow extends MutableRow { this.sizeInBytes = sizeInBytes; } + /** + * Update this UnsafeRow to point to the underlying byte array. + * + * @param buf byte array to point to + * @param numFields the number of fields in this row + * @param sizeInBytes the number of bytes valid in the byte array + */ + public void pointTo(byte[] buf, int numFields, int sizeInBytes) { +pointTo(buf, PlatformDependent.BYTE_ARRAY_OFFSET, numFields, sizeInBytes); + } + private void assertIndexIsValid(int index) { assert index >= 0 : "index (" + index + ") should >= 0"; assert index < numFields : "index (" + index + ") should < " + numFields; http://git-wip-us.apache.org/repos/asf/spark/blob/03377d25/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index e50ec27..36f4e9c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -27,6 +27,7 @@ import org.apache.spark.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.PlatformDependent import org.apache.spark.u
spark git commit: [SPARK-9318] [SPARK-9320] [SPARKR] Aliases for merge and summary functions on DataFrames
Repository: spark Updated Branches: refs/heads/master 8cb415a4b -> 712f5b7a9 [SPARK-9318] [SPARK-9320] [SPARKR] Aliases for merge and summary functions on DataFrames This PR adds synonyms for ```merge``` and ```summary``` in SparkR DataFrame API. cc shivaram Author: Hossein Closes #7806 from falaki/SPARK-9320 and squashes the following commits: 72600f7 [Hossein] Updated docs 92a6e75 [Hossein] Fixed merge generic signature issue 4c2b051 [Hossein] Fixing naming with mllib summary 0f3a64c [Hossein] Added ... to generic for merge 30fbaf8 [Hossein] Merged master ae1a4cf [Hossein] Merge branch 'master' into SPARK-9320 e8eb86f [Hossein] Add a generic for merge fc01f2d [Hossein] Added unit test 8d92012 [Hossein] Added merge as an alias for join 5b8bedc [Hossein] Added unit test 632693d [Hossein] Added summary as an alias for describe for DataFrame Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/712f5b7a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/712f5b7a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/712f5b7a Branch: refs/heads/master Commit: 712f5b7a9ab52c26e3d086629633950ec2fb7afc Parents: 8cb415a Author: Hossein Authored: Fri Jul 31 19:24:00 2015 -0700 Committer: Shivaram Venkataraman Committed: Fri Jul 31 19:24:44 2015 -0700 -- R/pkg/NAMESPACE | 2 ++ R/pkg/R/DataFrame.R | 22 ++ R/pkg/R/generics.R | 8 R/pkg/R/mllib.R | 8 R/pkg/inst/tests/test_sparkSQL.R | 14 -- 5 files changed, 48 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/712f5b7a/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index ff116cb..b2d92bd 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -46,6 +46,7 @@ exportMethods("arrange", "isLocal", "join", "limit", + "merge", "names", "ncol", "nrow", @@ -69,6 +70,7 @@ exportMethods("arrange", "show", "showDF", "summarize", + "summary", "take", "unionAll", "unique", http://git-wip-us.apache.org/repos/asf/spark/blob/712f5b7a/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index b4065d2..8956032 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1279,6 +1279,15 @@ setMethod("join", dataFrame(sdf) }) +#' rdname merge +#' aliases join +setMethod("merge", + signature(x = "DataFrame", y = "DataFrame"), + function(x, y, joinExpr = NULL, joinType = NULL, ...) { +join(x, y, joinExpr, joinType) + }) + + #' UnionAll #' #' Return a new DataFrame containing the union of rows in this DataFrame @@ -1524,6 +1533,19 @@ setMethod("describe", dataFrame(sdf) }) +#' @title Summary +#' +#' @description Computes statistics for numeric columns of the DataFrame +#' +#' @rdname summary +#' @aliases describe +setMethod("summary", + signature(x = "DataFrame"), + function(x) { +describe(x) + }) + + #' dropna #' #' Returns a new DataFrame omitting rows with null values. http://git-wip-us.apache.org/repos/asf/spark/blob/712f5b7a/R/pkg/R/generics.R -- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 71d1e34..c43b947 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -461,6 +461,10 @@ setGeneric("isLocal", function(x) { standardGeneric("isLocal") }) #' @export setGeneric("limit", function(x, num) {standardGeneric("limit") }) +#' rdname merge +#' @export +setGeneric("merge") + #' @rdname withColumn #' @export setGeneric("mutate", function(x, ...) {standardGeneric("mutate") }) @@ -531,6 +535,10 @@ setGeneric("showDF", function(x,...) { standardGeneric("showDF") }) #' @export setGeneric("summarize", function(x,...) { standardGeneric("summarize") }) +##' rdname summary +##' @export +setGeneric("summary", function(x, ...) { standardGeneric("summary") }) + # @rdname tojson # @export setGeneric("toJSON", function(x) { standardGeneric("toJSON") }) http://git-wip-us.apache.org/repos/asf/spark/blob/712f5b7a/R/pkg/R/mllib.R -- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index efddcc1..b524d1f 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -86,12 +86,12 @@ setMethod("predict", signature(object = "PipelineModel"), #' model <- glm(y ~ x, train
[2/2] spark git commit: [SPARK-9451] [SQL] Support entries larger than default page size in BytesToBytesMap & integrate with ShuffleMemoryManager
[SPARK-9451] [SQL] Support entries larger than default page size in BytesToBytesMap & integrate with ShuffleMemoryManager This patch adds support for entries larger than the default page size in BytesToBytesMap. These large rows are handled by allocating special overflow pages to hold individual entries. In addition, this patch integrates BytesToBytesMap with the ShuffleMemoryManager: - Move BytesToBytesMap from `unsafe` to `core` so that it can import `ShuffleMemoryManager`. - Before allocating new data pages, ask the ShuffleMemoryManager to reserve the memory: - `putNewKey()` now returns a boolean to indicate whether the insert succeeded or failed due to a lack of memory. The caller can use this value to respond to the memory pressure (e.g. by spilling). - `UnsafeFixedWidthAggregationMap. getAggregationBuffer()` now returns `null` to signal failure due to a lack of memory. - Updated all uses of these classes to handle these error conditions. - Added new tests for allocating large records and for allocations which fail due to memory pressure. - Extended the `afterAll()` test teardown methods to detect ShuffleMemoryManager leaks. Author: Josh Rosen Closes #7762 from JoshRosen/large-rows and squashes the following commits: ae7bc56 [Josh Rosen] Fix compilation 82fc657 [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-rows 34ab943 [Josh Rosen] Remove semi 31a525a [Josh Rosen] Integrate BytesToBytesMap with ShuffleMemoryManager. 626b33c [Josh Rosen] Move code to sql/core and spark/core packages so that ShuffleMemoryManager can be integrated ec4484c [Josh Rosen] Move BytesToBytesMap from unsafe package to core. 642ed69 [Josh Rosen] Rename size to numElements bea1152 [Josh Rosen] Add basic test. 2cd3570 [Josh Rosen] Remove accidental duplicated code 07ff9ef [Josh Rosen] Basic support for large rows in BytesToBytesMap. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8cb415a4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8cb415a4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8cb415a4 Branch: refs/heads/master Commit: 8cb415a4b9bc1f82127ccce4a5579d433f4e8f83 Parents: f51fd6f Author: Josh Rosen Authored: Fri Jul 31 19:19:27 2015 -0700 Committer: Josh Rosen Committed: Fri Jul 31 19:19:27 2015 -0700 -- .../spark/unsafe/map/BytesToBytesMap.java | 709 +++ .../spark/unsafe/map/HashMapGrowthStrategy.java | 41 ++ .../spark/shuffle/ShuffleMemoryManager.scala| 8 +- .../map/AbstractBytesToBytesMapSuite.java | 499 + .../unsafe/map/BytesToBytesMapOffHeapSuite.java | 29 + .../unsafe/map/BytesToBytesMapOnHeapSuite.java | 29 + .../UnsafeFixedWidthAggregationMap.java | 223 -- .../UnsafeFixedWidthAggregationMapSuite.scala | 132 .../UnsafeFixedWidthAggregationMap.java | 234 ++ .../sql/execution/GeneratedAggregate.scala | 6 + .../sql/execution/joins/HashedRelation.scala| 27 +- .../UnsafeFixedWidthAggregationMapSuite.scala | 140 .../spark/unsafe/map/BytesToBytesMap.java | 643 - .../spark/unsafe/map/HashMapGrowthStrategy.java | 41 -- .../apache/spark/unsafe/memory/MemoryBlock.java | 2 +- .../spark/unsafe/memory/TaskMemoryManager.java | 1 + .../map/AbstractBytesToBytesMapSuite.java | 385 -- .../unsafe/map/BytesToBytesMapOffHeapSuite.java | 29 - .../unsafe/map/BytesToBytesMapOnHeapSuite.java | 29 - 19 files changed, 1717 insertions(+), 1490 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8cb415a4/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java -- diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java new file mode 100644 index 000..0f42950 --- /dev/null +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -0,0 +1,709 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permission
[1/2] spark git commit: [SPARK-9451] [SQL] Support entries larger than default page size in BytesToBytesMap & integrate with ShuffleMemoryManager
Repository: spark Updated Branches: refs/heads/master f51fd6fbb -> 8cb415a4b http://git-wip-us.apache.org/repos/asf/spark/blob/8cb415a4/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala new file mode 100644 index 000..79fd52d --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.scalatest.{BeforeAndAfterEach, Matchers} + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.spark.SparkFunSuite +import org.apache.spark.shuffle.ShuffleMemoryManager +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, TaskMemoryManager} +import org.apache.spark.unsafe.types.UTF8String + + +class UnsafeFixedWidthAggregationMapSuite + extends SparkFunSuite + with Matchers + with BeforeAndAfterEach { + + import UnsafeFixedWidthAggregationMap._ + + private val groupKeySchema = StructType(StructField("product", StringType) :: Nil) + private val aggBufferSchema = StructType(StructField("salePrice", IntegerType) :: Nil) + private def emptyAggregationBuffer: InternalRow = InternalRow(0) + private val PAGE_SIZE_BYTES: Long = 1L << 26; // 64 megabytes + + private var taskMemoryManager: TaskMemoryManager = null + private var shuffleMemoryManager: ShuffleMemoryManager = null + + override def beforeEach(): Unit = { +taskMemoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP)) +shuffleMemoryManager = new ShuffleMemoryManager(Long.MaxValue) + } + + override def afterEach(): Unit = { +if (taskMemoryManager != null) { + val leakedShuffleMemory = shuffleMemoryManager.getMemoryConsumptionForThisTask + assert(taskMemoryManager.cleanUpAllAllocatedMemory() === 0) + assert(leakedShuffleMemory === 0) + taskMemoryManager = null +} + } + + test("supported schemas") { +assert(supportsAggregationBufferSchema( + StructType(StructField("x", DecimalType.USER_DEFAULT) :: Nil))) +assert(!supportsAggregationBufferSchema( + StructType(StructField("x", DecimalType.SYSTEM_DEFAULT) :: Nil))) +assert(!supportsAggregationBufferSchema(StructType(StructField("x", StringType) :: Nil))) +assert( + !supportsAggregationBufferSchema(StructType(StructField("x", ArrayType(IntegerType)) :: Nil))) + } + + test("empty map") { +val map = new UnsafeFixedWidthAggregationMap( + emptyAggregationBuffer, + aggBufferSchema, + groupKeySchema, + taskMemoryManager, + shuffleMemoryManager, + 1024, // initial capacity, + PAGE_SIZE_BYTES, + false // disable perf metrics +) +assert(!map.iterator().hasNext) +map.free() + } + + test("updating values for a single key") { +val map = new UnsafeFixedWidthAggregationMap( + emptyAggregationBuffer, + aggBufferSchema, + groupKeySchema, + taskMemoryManager, + shuffleMemoryManager, + 1024, // initial capacity + PAGE_SIZE_BYTES, + false // disable perf metrics +) +val groupKey = InternalRow(UTF8String.fromString("cats")) + +// Looking up a key stores a zero-entry in the map (like Python Counters or DefaultDicts) +assert(map.getAggregationBuffer(groupKey) != null) +val iter = map.iterator() +val entry = iter.next() +assert(!iter.hasNext) +entry.key.getString(0) should be ("cats") +entry.value.getInt(0) should be (0) + +// Modifications to rows retrieved from the map should update the values in the map +entry.value.setInt(0, 42) +map.getAggregationBuffer(groupKey).getInt(0) should be (42) + +map.free() + } + + test("inserting large random keys") { +val map = new Un
spark git commit: [SPARK-8936] [MLLIB] OnlineLDA document-topic Dirichlet hyperparameter optimization
Repository: spark Updated Branches: refs/heads/master 4d5a6e7b6 -> f51fd6fbb [SPARK-8936] [MLLIB] OnlineLDA document-topic Dirichlet hyperparameter optimization Adds `alpha` (document-topic Dirichlet parameter) hyperparameter optimization to `OnlineLDAOptimizer` following Huang: Maximum Likelihood Estimation of Dirichlet Distribution Parameters. Also introduces a private `setSampleWithReplacement` to `OnlineLDAOptimizer` for unit testing purposes. Author: Feynman Liang Closes #7836 from feynmanliang/SPARK-8936-alpha-optimize and squashes the following commits: 4bef484 [Feynman Liang] Documentation improvements c3c6c1d [Feynman Liang] Fix docs 151e859 [Feynman Liang] Fix style fa77518 [Feynman Liang] Hyperparameter optimization Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f51fd6fb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f51fd6fb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f51fd6fb Branch: refs/heads/master Commit: f51fd6fbb4d9822502f98b312251e317d757bc3a Parents: 4d5a6e7 Author: Feynman Liang Authored: Fri Jul 31 18:36:22 2015 -0700 Committer: Joseph K. Bradley Committed: Fri Jul 31 18:36:22 2015 -0700 -- .../spark/mllib/clustering/LDAOptimizer.scala | 75 +--- .../spark/mllib/clustering/LDASuite.scala | 34 + 2 files changed, 99 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f51fd6fb/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index d6f8b29..b0e14cb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -19,8 +19,8 @@ package org.apache.spark.mllib.clustering import java.util.Random -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, normalize, sum} -import breeze.numerics.{abs, exp} +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, all, normalize, sum} +import breeze.numerics.{trigamma, abs, exp} import breeze.stats.distributions.{Gamma, RandBasis} import org.apache.spark.annotation.DeveloperApi @@ -239,22 +239,26 @@ final class OnlineLDAOptimizer extends LDAOptimizer { /** alias for docConcentration */ private var alpha: Vector = Vectors.dense(0) - /** (private[clustering] for debugging) Get docConcentration */ + /** (for debugging) Get docConcentration */ private[clustering] def getAlpha: Vector = alpha /** alias for topicConcentration */ private var eta: Double = 0 - /** (private[clustering] for debugging) Get topicConcentration */ + /** (for debugging) Get topicConcentration */ private[clustering] def getEta: Double = eta private var randomGenerator: java.util.Random = null + /** (for debugging) Whether to sample mini-batches with replacement. (default = true) */ + private var sampleWithReplacement: Boolean = true + // Online LDA specific parameters // Learning rate is: (tau0 + t)^{-kappa} private var tau0: Double = 1024 private var kappa: Double = 0.51 private var miniBatchFraction: Double = 0.05 + private var optimizeAlpha: Boolean = false // internal data structure private var docs: RDD[(Long, Vector)] = null @@ -262,7 +266,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { /** Dirichlet parameter for the posterior over topics */ private var lambda: BDM[Double] = null - /** (private[clustering] for debugging) Get parameter for topics */ + /** (for debugging) Get parameter for topics */ private[clustering] def getLambda: BDM[Double] = lambda /** Current iteration (count of invocations of [[next()]]) */ @@ -325,7 +329,22 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * (private[clustering]) + * Optimize alpha, indicates whether alpha (Dirichlet parameter for document-topic distribution) + * will be optimized during training. + */ + def getOptimzeAlpha: Boolean = this.optimizeAlpha + + /** + * Sets whether to optimize alpha parameter during training. + * + * Default: false + */ + def setOptimzeAlpha(optimizeAlpha: Boolean): this.type = { +this.optimizeAlpha = optimizeAlpha +this + } + + /** * Set the Dirichlet parameter for the posterior over topics. * This is only used for testing now. In the future, it can help support training stop/resume. */ @@ -335,7 +354,6 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * (private[clustering]) * Used for random initialization of the variationa
spark git commit: [SPARK-8271][SQL]string function: soundex
Repository: spark Updated Branches: refs/heads/master 3fc0cb920 -> 4d5a6e7b6 [SPARK-8271][SQL]string function: soundex This PR brings SQL function soundex(), see https://issues.apache.org/jira/browse/HIVE-9738 It's based on #7115 , thanks to HuJiayin Author: HuJiayin Author: Davies Liu Closes #7812 from davies/soundex and squashes the following commits: fa75941 [Davies Liu] Merge branch 'master' of github.com:apache/spark into soundex a4bd6d8 [Davies Liu] fix soundex 2538908 [HuJiayin] add codegen soundex d15d329 [HuJiayin] add back ut ded1a14 [HuJiayin] Merge branch 'master' of https://github.com/apache/spark e2dec2c [HuJiayin] support soundex rebase code Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d5a6e7b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d5a6e7b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d5a6e7b Branch: refs/heads/master Commit: 4d5a6e7b60b315968973e22985eb174ec721 Parents: 3fc0cb9 Author: HuJiayin Authored: Fri Jul 31 16:05:26 2015 -0700 Committer: Reynold Xin Committed: Fri Jul 31 16:05:26 2015 -0700 -- python/pyspark/sql/functions.py | 17 +++ .../catalyst/analysis/FunctionRegistry.scala| 1 + .../catalyst/expressions/stringOperations.scala | 16 ++ .../expressions/StringExpressionsSuite.scala| 28 +++ .../scala/org/apache/spark/sql/functions.scala | 8 +++ .../apache/spark/sql/StringFunctionsSuite.scala | 9 .../apache/spark/unsafe/types/UTF8String.java | 53 .../spark/unsafe/types/UTF8StringSuite.java | 48 ++ 8 files changed, 180 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4d5a6e7b/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 8024a8d..bb9926c 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -63,6 +63,8 @@ __all__ += [ 'year', 'quarter', 'month', 'hour', 'minute', 'second', 'dayofmonth', 'dayofyear', 'weekofyear'] +__all__ += ['soundex'] + def _create_function(name, doc=""): """ Create a function for aggregator by name""" @@ -922,6 +924,7 @@ def trunc(date, format): def size(col): """ Collection function: returns the length of the array or map stored in the column. + :param col: name of column or expression >>> df = sqlContext.createDataFrame([([1, 2, 3],),([1],),([],)], ['data']) @@ -932,6 +935,20 @@ def size(col): return Column(sc._jvm.functions.size(_to_java_column(col))) +@since +@ignore_unicode_prefix +def soundex(col): +""" +Returns the SoundEx encoding for a string + +>>> df = sqlContext.createDataFrame([("Peters",),("Uhrbach",)], ['name']) +>>> df.select(soundex(df.name).alias("soundex")).collect() +[Row(soundex=u'P362'), Row(soundex=u'U612')] +""" +sc = SparkContext._active_spark_context +return Column(sc._jvm.functions.size(_to_java_column(col))) + + class UserDefinedFunction(object): """ User defined function in Python http://git-wip-us.apache.org/repos/asf/spark/blob/4d5a6e7b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 1bf7204..3f61a9a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -194,6 +194,7 @@ object FunctionRegistry { expression[StringRepeat]("repeat"), expression[StringReverse]("reverse"), expression[StringTrimRight]("rtrim"), +expression[SoundEx]("soundex"), expression[StringSpace]("space"), expression[StringSplit]("split"), expression[Substring]("substr"), http://git-wip-us.apache.org/repos/asf/spark/blob/4d5a6e7b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala index 684eac1..160e72f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala @@ -719,6 +719,22 @
spark git commit: [SPARK-9233] [SQL] Enable code-gen in window function unit tests
Repository: spark Updated Branches: refs/heads/master 710c2b5dd -> 3fc0cb920 [SPARK-9233] [SQL] Enable code-gen in window function unit tests Since code-gen is enabled by default, it is better to run window function tests with code-gen. https://issues.apache.org/jira/browse/SPARK-9233 Author: Yin Huai Closes #7832 from yhuai/SPARK-9233 and squashes the following commits: 4e4e4cc [Yin Huai] style ca80e07 [Yin Huai] Test window function with codegen. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3fc0cb92 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3fc0cb92 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3fc0cb92 Branch: refs/heads/master Commit: 3fc0cb92001798167a14c1377362a3335397dd4c Parents: 710c2b5 Author: Yin Huai Authored: Fri Jul 31 14:13:06 2015 -0700 Committer: Yin Huai Committed: Fri Jul 31 14:13:06 2015 -0700 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 9 +++-- .../HiveWindowFunctionQuerySuite.scala | 38 +++- 2 files changed, 12 insertions(+), 35 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3fc0cb92/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 51d910b..f5daba1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -853,8 +853,13 @@ class Analyzer( while (i < groupedWindowExpressions.size) { val ((partitionSpec, orderSpec), windowExpressions) = groupedWindowExpressions(i) // Set currentChild to the newly created Window operator. -currentChild = Window(currentChild.output, windowExpressions, - partitionSpec, orderSpec, currentChild) +currentChild = + Window( +currentChild.output, +windowExpressions, +partitionSpec, +orderSpec, +currentChild) // Move to next Window Spec. i += 1 http://git-wip-us.apache.org/repos/asf/spark/blob/3fc0cb92/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala -- diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala index 24a758f..92bb9e6 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.util.Utils * for different tests and there are a few properties needed to let Hive generate golden * files, every `createQueryTest` calls should explicitly set `reset` to `false`. */ -abstract class HiveWindowFunctionQueryBaseSuite extends HiveComparisonTest with BeforeAndAfter { +class HiveWindowFunctionQuerySuite extends HiveComparisonTest with BeforeAndAfter { private val originalTimeZone = TimeZone.getDefault private val originalLocale = Locale.getDefault private val testTempDir = Utils.createTempDir() @@ -759,21 +759,7 @@ abstract class HiveWindowFunctionQueryBaseSuite extends HiveComparisonTest with """.stripMargin, reset = false) } -class HiveWindowFunctionQueryWithoutCodeGenSuite extends HiveWindowFunctionQueryBaseSuite { - var originalCodegenEnabled: Boolean = _ - override def beforeAll(): Unit = { -super.beforeAll() -originalCodegenEnabled = conf.codegenEnabled -sql("set spark.sql.codegen=false") - } - - override def afterAll(): Unit = { -sql(s"set spark.sql.codegen=$originalCodegenEnabled") -super.afterAll() - } -} - -abstract class HiveWindowFunctionQueryFileBaseSuite +class HiveWindowFunctionQueryFileSuite extends HiveCompatibilitySuite with BeforeAndAfter { private val originalTimeZone = TimeZone.getDefault private val originalLocale = Locale.getDefault @@ -789,11 +775,11 @@ abstract class HiveWindowFunctionQueryFileBaseSuite // The following settings are used for generating golden files with Hive. // We have to use kryo to correctly let Hive serialize plans with window functions. // This is used to generate golden files. -sql("set hive.plan.serialization.format=kryo") +// sql("set hive.plan.serialization
spark git commit: [SPARK-9324] [SPARK-9322] [SPARK-9321] [SPARKR] Some aliases for R-like functions in DataFrames
Repository: spark Updated Branches: refs/heads/master 82f47b811 -> 710c2b5dd [SPARK-9324] [SPARK-9322] [SPARK-9321] [SPARKR] Some aliases for R-like functions in DataFrames Adds following aliases: * unique (distinct) * rbind (unionAll): accepts many DataFrames * nrow (count) * ncol * dim * names (columns): along with the replacement function to change names Author: Hossein Closes #7764 from falaki/sparkR-alias and squashes the following commits: 56016f5 [Hossein] Updated R documentation 5e4a4d0 [Hossein] Removed extra code f51cbef [Hossein] Merge branch 'master' into sparkR-alias c1b88bd [Hossein] Moved setGeneric and other comments applied d9307f8 [Hossein] Added tests b5aa988 [Hossein] Added dim, ncol, nrow, names, rbind, and unique functions to DataFrames Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/710c2b5d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/710c2b5d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/710c2b5d Branch: refs/heads/master Commit: 710c2b5dd2dc6b8d947303ad8dfae4539b63fe11 Parents: 82f47b8 Author: Hossein Authored: Fri Jul 31 14:07:41 2015 -0700 Committer: Shivaram Venkataraman Committed: Fri Jul 31 14:08:18 2015 -0700 -- R/pkg/NAMESPACE | 6 +++ R/pkg/R/DataFrame.R | 90 +++ R/pkg/R/generics.R | 4 ++ R/pkg/inst/tests/test_sparkSQL.R | 22 +++-- 4 files changed, 119 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/710c2b5d/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index a329e14..ff116cb 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -29,6 +29,7 @@ exportMethods("arrange", "count", "crosstab", "describe", + "dim", "distinct", "dropna", "dtypes", @@ -45,11 +46,15 @@ exportMethods("arrange", "isLocal", "join", "limit", + "names", + "ncol", + "nrow", "orderBy", "mutate", "names", "persist", "printSchema", + "rbind", "registerTempTable", "rename", "repartition", @@ -66,6 +71,7 @@ exportMethods("arrange", "summarize", "take", "unionAll", + "unique", "unpersist", "where", "withColumn", http://git-wip-us.apache.org/repos/asf/spark/blob/710c2b5d/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index b31ad37..b4065d2 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -255,6 +255,16 @@ setMethod("names", columns(x) }) +#' @rdname columns +setMethod("names<-", + signature(x = "DataFrame"), + function(x, value) { +if (!is.null(value)) { + sdf <- callJMethod(x@sdf, "toDF", listToSeq(as.list(value))) + dataFrame(sdf) +} + }) + #' Register Temporary Table #' #' Registers a DataFrame as a Temporary Table in the SQLContext @@ -473,6 +483,18 @@ setMethod("distinct", dataFrame(sdf) }) +#' @title Distinct rows in a DataFrame +# +#' @description Returns a new DataFrame containing distinct rows in this DataFrame +#' +#' @rdname unique +#' @aliases unique +setMethod("unique", + signature(x = "DataFrame"), + function(x) { +distinct(x) + }) + #' Sample #' #' Return a sampled subset of this DataFrame using a random seed. @@ -534,6 +556,58 @@ setMethod("count", callJMethod(x@sdf, "count") }) +#' @title Number of rows for a DataFrame +#' @description Returns number of rows in a DataFrames +#' +#' @name nrow +#' +#' @rdname nrow +#' @aliases count +setMethod("nrow", + signature(x = "DataFrame"), + function(x) { +count(x) + }) + +#' Returns the number of columns in a DataFrame +#' +#' @param x a SparkSQL DataFrame +#' +#' @rdname ncol +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlContext <- sparkRSQL.init(sc) +#' path <- "path/to/file.json" +#' df <- jsonFile(sqlContext, path) +#' ncol(df) +#' } +setMethod("ncol", + signature(x = "DataFrame"), + function(x) { +length(columns(x)) + }) + +#' Returns the dimentions (number of rows and columns) of a DataFrame +#' @param x a SparkSQL DataFrame +#' +#' @rdname dim +#' @export +#' @examples +#'\don
spark git commit: [SPARK-9507] [BUILD] Remove dependency reduced POM hack now that shade plugin is updated
Repository: spark Updated Branches: refs/heads/branch-1.3 f941482b0 -> 047a61365 [SPARK-9507] [BUILD] Remove dependency reduced POM hack now that shade plugin is updated Update to shade plugin 2.4.1, which removes the need for the dependency-reduced-POM workaround and the 'release' profile. Fix management of shade plugin version so children inherit it; bump assembly plugin version while here See https://issues.apache.org/jira/browse/SPARK-8819 I verified that `mvn clean package -DskipTests` works with Maven 3.3.3. pwendell are you up for trying this for the 1.5.0 release? Author: Sean Owen Closes #7826 from srowen/SPARK-9507 and squashes the following commits: e0b0fd2 [Sean Owen] Update to shade plugin 2.4.1, which removes the need for the dependency-reduced-POM workaround and the 'release' profile. Fix management of shade plugin version so children inherit it; bump assembly plugin version while here (cherry picked from commit 6e5fd613ea4b9aa0ab485ba681277a51a4367168) Signed-off-by: Sean Owen # Conflicts: # dev/create-release/create-release.sh # pom.xml Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/047a6136 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/047a6136 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/047a6136 Branch: refs/heads/branch-1.3 Commit: 047a61365fa795f8e646b61049cde038d074a78c Parents: f941482 Author: Sean Owen Authored: Fri Jul 31 21:51:55 2015 +0100 Committer: Sean Owen Committed: Fri Jul 31 22:05:57 2015 +0100 -- dev/create-release/create-release.sh | 8 pom.xml | 29 + 2 files changed, 9 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/047a6136/dev/create-release/create-release.sh -- diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh index 0979d5e..120914f 100755 --- a/dev/create-release/create-release.sh +++ b/dev/create-release/create-release.sh @@ -117,13 +117,13 @@ if [[ ! "$@" =~ --skip-publish ]]; then echo "Created Nexus staging repository: $staged_repo_id" build/mvn -DskipTests -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \ --Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl -Prelease-profile \ +-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \ clean install ./dev/change-version-to-2.11.sh - + build/mvn -DskipTests -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \ --Dscala-2.11 -Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl -Prelease-profile \ +-Dscala-2.11 -Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \ clean install ./dev/change-version-to-2.10.sh @@ -197,7 +197,7 @@ if [[ ! "$@" =~ --skip-package ]]; then NAME=$1 FLAGS=$2 cp -r spark spark-$RELEASE_VERSION-bin-$NAME - + cd spark-$RELEASE_VERSION-bin-$NAME # TODO There should probably be a flag to make-distribution to allow 2.11 support http://git-wip-us.apache.org/repos/asf/spark/blob/047a6136/pom.xml -- diff --git a/pom.xml b/pom.xml index bcc2f51..099b002 100644 --- a/pom.xml +++ b/pom.xml @@ -156,7 +156,6 @@ 1.8.8 2.4.4 1.1.1.7 -false - ${create.dependency.reduced.pom} @@ -1751,26 +1752,6 @@ - - - release-profile - - -true - - -
spark git commit: [SPARK-9510] [SPARKR] Remaining SparkR style fixes
Repository: spark Updated Branches: refs/heads/master 6e5fd613e -> 82f47b811 [SPARK-9510] [SPARKR] Remaining SparkR style fixes With the change in this patch, I get no more warnings from `./dev/lint-r` in my machine Author: Shivaram Venkataraman Closes #7834 from shivaram/sparkr-style-fixes and squashes the following commits: 716cd8e [Shivaram Venkataraman] Remaining SparkR style fixes Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82f47b81 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82f47b81 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82f47b81 Branch: refs/heads/master Commit: 82f47b811607a1cba437fe0ffc15d4e5f9ec Parents: 6e5fd61 Author: Shivaram Venkataraman Authored: Fri Jul 31 14:02:44 2015 -0700 Committer: Shivaram Venkataraman Committed: Fri Jul 31 14:02:44 2015 -0700 -- R/pkg/R/RDD.R| 6 +++--- R/pkg/inst/tests/test_sparkSQL.R | 4 +++- 2 files changed, 6 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/82f47b81/R/pkg/R/RDD.R -- diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index 2a013b3..051e441 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -1264,12 +1264,12 @@ setMethod("pipeRDD", signature(x = "RDD", command = "character"), function(x, command, env = list()) { func <- function(part) { - trim.trailing.func <- function(x) { + trim_trailing_func <- function(x) { sub("[\r\n]*$", "", toString(x)) } - input <- unlist(lapply(part, trim.trailing.func)) + input <- unlist(lapply(part, trim_trailing_func)) res <- system2(command, stdout = TRUE, input = input, env = env) - lapply(res, trim.trailing.func) + lapply(res, trim_trailing_func) } lapplyPartition(x, func) }) http://git-wip-us.apache.org/repos/asf/spark/blob/82f47b81/R/pkg/inst/tests/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index aca41aa..25f6973 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -128,7 +128,9 @@ test_that("create DataFrame from RDD", { expect_equal(dtypes(df2), list(c("name", "string"), c("age", "int"), c("height", "float"))) expect_equal(collect(where(df2, df2$name == "Bob")), c("Bob", 16, 176.5)) - localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18), height=c(164.10, 181.4, 173.7)) + localDF <- data.frame(name=c("John", "Smith", "Sarah"), +age=c(19, 23, 18), +height=c(164.10, 181.4, 173.7)) df <- createDataFrame(sqlContext, localDF, schema) expect_is(df, "DataFrame") expect_equal(count(df), 3) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9507] [BUILD] Remove dependency reduced POM hack now that shade plugin is updated
Repository: spark Updated Branches: refs/heads/master 873ab0f96 -> 6e5fd613e [SPARK-9507] [BUILD] Remove dependency reduced POM hack now that shade plugin is updated Update to shade plugin 2.4.1, which removes the need for the dependency-reduced-POM workaround and the 'release' profile. Fix management of shade plugin version so children inherit it; bump assembly plugin version while here See https://issues.apache.org/jira/browse/SPARK-8819 I verified that `mvn clean package -DskipTests` works with Maven 3.3.3. pwendell are you up for trying this for the 1.5.0 release? Author: Sean Owen Closes #7826 from srowen/SPARK-9507 and squashes the following commits: e0b0fd2 [Sean Owen] Update to shade plugin 2.4.1, which removes the need for the dependency-reduced-POM workaround and the 'release' profile. Fix management of shade plugin version so children inherit it; bump assembly plugin version while here Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e5fd613 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e5fd613 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e5fd613 Branch: refs/heads/master Commit: 6e5fd613ea4b9aa0ab485ba681277a51a4367168 Parents: 873ab0f Author: Sean Owen Authored: Fri Jul 31 21:51:55 2015 +0100 Committer: Sean Owen Committed: Fri Jul 31 21:51:55 2015 +0100 -- dev/create-release/create-release.sh | 4 ++-- pom.xml | 33 ++- 2 files changed, 8 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6e5fd613/dev/create-release/create-release.sh -- diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh index 86a7a40..4311c8c 100755 --- a/dev/create-release/create-release.sh +++ b/dev/create-release/create-release.sh @@ -118,13 +118,13 @@ if [[ ! "$@" =~ --skip-publish ]]; then rm -rf $SPARK_REPO - build/mvn -DskipTests -Pyarn -Phive -Prelease\ + build/mvn -DskipTests -Pyarn -Phive \ -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \ clean install ./dev/change-scala-version.sh 2.11 - build/mvn -DskipTests -Pyarn -Phive -Prelease\ + build/mvn -DskipTests -Pyarn -Phive \ -Dscala-2.11 -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \ clean install http://git-wip-us.apache.org/repos/asf/spark/blob/6e5fd613/pom.xml -- diff --git a/pom.xml b/pom.xml index e351c7c..1371a1b 100644 --- a/pom.xml +++ b/pom.xml @@ -160,9 +160,6 @@ 2.4.4 1.1.1.7 1.1.2 - -false - ${java.home} - ${create.dependency.reduced.pom} @@ -1836,26 +1835,6 @@ - - - release - - -true - - -
spark git commit: [SPARK-9507] [BUILD] Remove dependency reduced POM hack now that shade plugin is updated
Repository: spark Updated Branches: refs/heads/branch-1.4 5ad9f950c -> b53ca247d [SPARK-9507] [BUILD] Remove dependency reduced POM hack now that shade plugin is updated Update to shade plugin 2.4.1, which removes the need for the dependency-reduced-POM workaround and the 'release' profile. Fix management of shade plugin version so children inherit it; bump assembly plugin version while here See https://issues.apache.org/jira/browse/SPARK-8819 I verified that `mvn clean package -DskipTests` works with Maven 3.3.3. pwendell are you up for trying this for the 1.5.0 release? Author: Sean Owen Closes #7826 from srowen/SPARK-9507 and squashes the following commits: e0b0fd2 [Sean Owen] Update to shade plugin 2.4.1, which removes the need for the dependency-reduced-POM workaround and the 'release' profile. Fix management of shade plugin version so children inherit it; bump assembly plugin version while here (cherry picked from commit 6e5fd613ea4b9aa0ab485ba681277a51a4367168) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b53ca247 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b53ca247 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b53ca247 Branch: refs/heads/branch-1.4 Commit: b53ca247d4a965002a9f31758ea2b28fe117d45f Parents: 5ad9f95 Author: Sean Owen Authored: Fri Jul 31 21:51:55 2015 +0100 Committer: Sean Owen Committed: Fri Jul 31 21:52:11 2015 +0100 -- dev/create-release/create-release.sh | 4 ++-- pom.xml | 33 ++- 2 files changed, 8 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b53ca247/dev/create-release/create-release.sh -- diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh index 30190dc..54274a8 100755 --- a/dev/create-release/create-release.sh +++ b/dev/create-release/create-release.sh @@ -118,13 +118,13 @@ if [[ ! "$@" =~ --skip-publish ]]; then rm -rf $SPARK_REPO - build/mvn -DskipTests -Pyarn -Phive -Prelease\ + build/mvn -DskipTests -Pyarn -Phive \ -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \ clean install ./dev/change-version-to-2.11.sh - build/mvn -DskipTests -Pyarn -Phive -Prelease\ + build/mvn -DskipTests -Pyarn -Phive \ -Dscala-2.11 -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \ clean install http://git-wip-us.apache.org/repos/asf/spark/blob/b53ca247/pom.xml -- diff --git a/pom.xml b/pom.xml index 53e9388..6c102da 100644 --- a/pom.xml +++ b/pom.xml @@ -162,9 +162,6 @@ 2.4.4 1.1.1.7 1.1.2 - -false - ${java.home} - ${create.dependency.reduced.pom} @@ -1817,26 +1816,6 @@ - - - release - - -true - - -
spark git commit: [SPARK-9490] [DOCS] [MLLIB] MLlib evaluation metrics guide example python code uses deprecated print statement
Repository: spark Updated Branches: refs/heads/master 815c8245f -> 873ab0f96 [SPARK-9490] [DOCS] [MLLIB] MLlib evaluation metrics guide example python code uses deprecated print statement Use print(x) not print x for Python 3 in eval examples CC sethah mengxr -- just wanted to close this out before 1.5 Author: Sean Owen Closes #7822 from srowen/SPARK-9490 and squashes the following commits: 01abeba [Sean Owen] Change "print x" to "print(x)" in the rest of the docs too bd7f7fb [Sean Owen] Use print(x) not print x for Python 3 in eval examples Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/873ab0f9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/873ab0f9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/873ab0f9 Branch: refs/heads/master Commit: 873ab0f9692d8ea6220abdb8d9200041068372a8 Parents: 815c824 Author: Sean Owen Authored: Fri Jul 31 13:45:28 2015 -0700 Committer: Xiangrui Meng Committed: Fri Jul 31 13:45:28 2015 -0700 -- docs/ml-guide.md| 2 +- docs/mllib-evaluation-metrics.md| 66 docs/mllib-feature-extraction.md| 2 +- docs/mllib-statistics.md| 20 +- docs/quick-start.md | 2 +- docs/sql-programming-guide.md | 6 +-- docs/streaming-programming-guide.md | 2 +- 7 files changed, 50 insertions(+), 50 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/873ab0f9/docs/ml-guide.md -- diff --git a/docs/ml-guide.md b/docs/ml-guide.md index 8c46adf..b6ca50e 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -561,7 +561,7 @@ test = sc.parallelize([(4L, "spark i j k"), prediction = model.transform(test) selected = prediction.select("id", "text", "prediction") for row in selected.collect(): -print row +print(row) sc.stop() {% endhighlight %} http://git-wip-us.apache.org/repos/asf/spark/blob/873ab0f9/docs/mllib-evaluation-metrics.md -- diff --git a/docs/mllib-evaluation-metrics.md b/docs/mllib-evaluation-metrics.md index 4ca0bb0..7066d5c 100644 --- a/docs/mllib-evaluation-metrics.md +++ b/docs/mllib-evaluation-metrics.md @@ -302,10 +302,10 @@ predictionAndLabels = test.map(lambda lp: (float(model.predict(lp.features)), lp metrics = BinaryClassificationMetrics(predictionAndLabels) # Area under precision-recall curve -print "Area under PR = %s" % metrics.areaUnderPR +print("Area under PR = %s" % metrics.areaUnderPR) # Area under ROC curve -print "Area under ROC = %s" % metrics.areaUnderROC +print("Area under ROC = %s" % metrics.areaUnderROC) {% endhighlight %} @@ -606,24 +606,24 @@ metrics = MulticlassMetrics(predictionAndLabels) precision = metrics.precision() recall = metrics.recall() f1Score = metrics.fMeasure() -print "Summary Stats" -print "Precision = %s" % precision -print "Recall = %s" % recall -print "F1 Score = %s" % f1Score +print("Summary Stats") +print("Precision = %s" % precision) +print("Recall = %s" % recall) +print("F1 Score = %s" % f1Score) # Statistics by class labels = data.map(lambda lp: lp.label).distinct().collect() for label in sorted(labels): -print "Class %s precision = %s" % (label, metrics.precision(label)) -print "Class %s recall = %s" % (label, metrics.recall(label)) -print "Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0)) +print("Class %s precision = %s" % (label, metrics.precision(label))) +print("Class %s recall = %s" % (label, metrics.recall(label))) +print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0))) # Weighted stats -print "Weighted recall = %s" % metrics.weightedRecall -print "Weighted precision = %s" % metrics.weightedPrecision -print "Weighted F(1) Score = %s" % metrics.weightedFMeasure() -print "Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5) -print "Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate +print("Weighted recall = %s" % metrics.weightedRecall) +print("Weighted precision = %s" % metrics.weightedPrecision) +print("Weighted F(1) Score = %s" % metrics.weightedFMeasure()) +print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5)) +print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate) {% endhighlight %} @@ -881,28 +881,28 @@ scoreAndLabels = sc.parallelize([ metrics = MultilabelMetrics(scoreAndLabels) # Summary stats -print "Recall = %s" % metrics.recall() -print "Precision = %s" % metrics.precision() -print "F1 measure = %s" % metrics.f1Measure() -print "Accuracy = %s" % metrics.accuracy +print("Recall = %s" % metrics.recall()) +print("Precision = %s" % metrics.precision()) +
spark git commit: [SPARK-9466] [SQL] Increate two timeouts in CliSuite.
Repository: spark Updated Branches: refs/heads/master fbef566a1 -> 815c8245f [SPARK-9466] [SQL] Increate two timeouts in CliSuite. Hopefully this can resolve the flakiness of this suite. JIRA: https://issues.apache.org/jira/browse/SPARK-9466 Author: Yin Huai Closes # from yhuai/SPARK-9466 and squashes the following commits: e0e3a86 [Yin Huai] Increate the timeout. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/815c8245 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/815c8245 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/815c8245 Branch: refs/heads/master Commit: 815c8245f47e61226a04e2e02f508457b5e9e536 Parents: fbef566 Author: Yin Huai Authored: Fri Jul 31 13:45:12 2015 -0700 Committer: Yin Huai Committed: Fri Jul 31 13:45:12 2015 -0700 -- .../scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/815c8245/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala -- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 13b0c59..df80d04 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -137,7 +137,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging { } test("Single command with --database") { -runCliWithin(1.minute)( +runCliWithin(2.minute)( "CREATE DATABASE hive_test_db;" -> "OK", "USE hive_test_db;" @@ -148,7 +148,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging { -> "Time taken: " ) -runCliWithin(1.minute, Seq("--database", "hive_test_db", "-e", "SHOW TABLES;"))( +runCliWithin(2.minute, Seq("--database", "hive_test_db", "-e", "SHOW TABLES;"))( "" -> "OK", "" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9308] [ML] ml.NaiveBayesModel support predicting class probabilities
Repository: spark Updated Branches: refs/heads/master 060c79aab -> fbef566a1 [SPARK-9308] [ML] ml.NaiveBayesModel support predicting class probabilities Make NaiveBayesModel support predicting class probabilities, inherit from ProbabilisticClassificationModel. Author: Yanbo Liang Closes #7672 from yanboliang/spark-9308 and squashes the following commits: 25e224c [Yanbo Liang] raw2probabilityInPlace should operate in-place 3ee56d6 [Yanbo Liang] change predictRaw and raw2probabilityInPlace c07e7a2 [Yanbo Liang] ml.NaiveBayesModel support predicting class probabilities Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbef566a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbef566a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbef566a Branch: refs/heads/master Commit: fbef566a107b47e5fddde0ea65b8587d5039062d Parents: 060c79a Author: Yanbo Liang Authored: Fri Jul 31 13:11:42 2015 -0700 Committer: Joseph K. Bradley Committed: Fri Jul 31 13:11:42 2015 -0700 -- .../spark/ml/classification/NaiveBayes.scala| 65 +++- .../ml/classification/NaiveBayesSuite.scala | 54 +++- 2 files changed, 101 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fbef566a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala index 5be35fe..b46b676 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala @@ -69,7 +69,7 @@ private[ml] trait NaiveBayesParams extends PredictorParams { * The input feature values must be nonnegative. */ class NaiveBayes(override val uid: String) - extends Predictor[Vector, NaiveBayes, NaiveBayesModel] + extends ProbabilisticClassifier[Vector, NaiveBayes, NaiveBayesModel] with NaiveBayesParams { def this() = this(Identifiable.randomUID("nb")) @@ -106,7 +106,7 @@ class NaiveBayesModel private[ml] ( override val uid: String, val pi: Vector, val theta: Matrix) - extends PredictionModel[Vector, NaiveBayesModel] with NaiveBayesParams { + extends ProbabilisticClassificationModel[Vector, NaiveBayesModel] with NaiveBayesParams { import OldNaiveBayes.{Bernoulli, Multinomial} @@ -129,29 +129,62 @@ class NaiveBayesModel private[ml] ( throw new UnknownError(s"Invalid modelType: ${$(modelType)}.") } - override protected def predict(features: Vector): Double = { + override val numClasses: Int = pi.size + + private def multinomialCalculation(features: Vector) = { +val prob = theta.multiply(features) +BLAS.axpy(1.0, pi, prob) +prob + } + + private def bernoulliCalculation(features: Vector) = { +features.foreachActive((_, value) => + if (value != 0.0 && value != 1.0) { +throw new SparkException( + s"Bernoulli naive Bayes requires 0 or 1 feature values but found $features.") + } +) +val prob = thetaMinusNegTheta.get.multiply(features) +BLAS.axpy(1.0, pi, prob) +BLAS.axpy(1.0, negThetaSum.get, prob) +prob + } + + override protected def predictRaw(features: Vector): Vector = { $(modelType) match { case Multinomial => -val prob = theta.multiply(features) -BLAS.axpy(1.0, pi, prob) -prob.argmax +multinomialCalculation(features) case Bernoulli => -features.foreachActive{ (index, value) => - if (value != 0.0 && value != 1.0) { -throw new SparkException( - s"Bernoulli naive Bayes requires 0 or 1 feature values but found $features") - } -} -val prob = thetaMinusNegTheta.get.multiply(features) -BLAS.axpy(1.0, pi, prob) -BLAS.axpy(1.0, negThetaSum.get, prob) -prob.argmax +bernoulliCalculation(features) case _ => // This should never happen. throw new UnknownError(s"Invalid modelType: ${$(modelType)}.") } } + override protected def raw2probabilityInPlace(rawPrediction: Vector): Vector = { +rawPrediction match { + case dv: DenseVector => +var i = 0 +val size = dv.size +val maxLog = dv.values.max +while (i < size) { + dv.values(i) = math.exp(dv.values(i) - maxLog) + i += 1 +} +val probSum = dv.values.sum +i = 0 +while (i < size) { + dv.values(i) = dv.values(i) / probSum + i += 1 +} +dv + case sv: SparseVector => +throw new RuntimeException(
spark git commit: [SPARK-9056] [STREAMING] Rename configuration `spark.streaming.minRememberDuration` to `spark.streaming.fileStream.minRememberDuration`
Repository: spark Updated Branches: refs/heads/master 3c0d2e552 -> 060c79aab [SPARK-9056] [STREAMING] Rename configuration `spark.streaming.minRememberDuration` to `spark.streaming.fileStream.minRememberDuration` Rename configuration `spark.streaming.minRememberDuration` to `spark.streaming.fileStream.minRememberDuration` Author: Sameer Abhyankar Author: Sameer Abhyankar Closes #7740 from sabhyankar/spark_branch_9056 and squashes the following commits: d5b2f1f [Sameer Abhyankar] Correct deprecated version to 1.5 1268133 [Sameer Abhyankar] Add {} and indentation ddf9844 [Sameer Abhyankar] Change 4 space indentation to 2 space indentation 1819b5f [Sameer Abhyankar] Use spark.streaming.fileStream.minRememberDuration property in lieu of spark.streaming.minRememberDuration Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/060c79aa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/060c79aa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/060c79aa Branch: refs/heads/master Commit: 060c79aab58efd4ce7353a1b00534de0d9e1de0b Parents: 3c0d2e5 Author: Sameer Abhyankar Authored: Fri Jul 31 13:08:55 2015 -0700 Committer: Tathagata Das Committed: Fri Jul 31 13:08:55 2015 -0700 -- core/src/main/scala/org/apache/spark/SparkConf.scala | 4 +++- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 6 -- 2 files changed, 7 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/060c79aa/core/src/main/scala/org/apache/spark/SparkConf.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 4161792..08bab4b 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -548,7 +548,9 @@ private[spark] object SparkConf extends Logging { "spark.rpc.askTimeout" -> Seq( AlternateConfig("spark.akka.askTimeout", "1.4")), "spark.rpc.lookupTimeout" -> Seq( - AlternateConfig("spark.akka.lookupTimeout", "1.4")) + AlternateConfig("spark.akka.lookupTimeout", "1.4")), +"spark.streaming.fileStream.minRememberDuration" -> Seq( + AlternateConfig("spark.streaming.minRememberDuration", "1.5")) ) /** http://git-wip-us.apache.org/repos/asf/spark/blob/060c79aa/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index dd4da9d..c358f5b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -86,8 +86,10 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( * Files with mod times older than this "window" of remembering will be ignored. So if new * files are visible within this window, then the file will get selected in the next batch. */ - private val minRememberDurationS = -Seconds(ssc.conf.getTimeAsSeconds("spark.streaming.minRememberDuration", "60s")) + private val minRememberDurationS = { + Seconds(ssc.conf.getTimeAsSeconds("spark.streaming.fileStream.minRememberDuration", + ssc.conf.get("spark.streaming.minRememberDuration", "60s"))) + } // This is a def so that it works during checkpoint recovery: private def clock = ssc.scheduler.clock - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9246] [MLLIB] DistributedLDAModel predict top docs per topic
Repository: spark Updated Branches: refs/heads/master c0686668a -> 3c0d2e552 [SPARK-9246] [MLLIB] DistributedLDAModel predict top docs per topic Add topDocumentsPerTopic to DistributedLDAModel. Add ScalaDoc and unit tests. Author: Meihua Wu Closes #7769 from rotationsymmetry/SPARK-9246 and squashes the following commits: 1029e79c [Meihua Wu] clean up code comments a023b82 [Meihua Wu] Update tests to use Long for doc index. 91e5998 [Meihua Wu] Use Long for doc index. b9f70cf [Meihua Wu] Revise topDocumentsPerTopic 26ff3f6 [Meihua Wu] Add topDocumentsPerTopic, scala doc and unit tests Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c0d2e55 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c0d2e55 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c0d2e55 Branch: refs/heads/master Commit: 3c0d2e55210735e0df2f8febb5f63c224af230e3 Parents: c068666 Author: Meihua Wu Authored: Fri Jul 31 13:01:10 2015 -0700 Committer: Joseph K. Bradley Committed: Fri Jul 31 13:01:10 2015 -0700 -- .../spark/mllib/clustering/LDAModel.scala | 37 .../spark/mllib/clustering/LDASuite.scala | 22 2 files changed, 59 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3c0d2e55/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index ff7035d..0cdac84 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -516,6 +516,43 @@ class DistributedLDAModel private[clustering] ( } } + /** + * Return the top documents for each topic + * + * This is approximate; it may not return exactly the top-weighted documents for each topic. + * To get a more precise set of top documents, increase maxDocumentsPerTopic. + * + * @param maxDocumentsPerTopic Maximum number of documents to collect for each topic. + * @return Array over topics. Each element represent as a pair of matching arrays: + * (IDs for the documents, weights of the topic in these documents). + * For each topic, documents are sorted in order of decreasing topic weights. + */ + def topDocumentsPerTopic(maxDocumentsPerTopic: Int): Array[(Array[Long], Array[Double])] = { +val numTopics = k +val topicsInQueues: Array[BoundedPriorityQueue[(Double, Long)]] = + topicDistributions.mapPartitions { docVertices => +// For this partition, collect the most common docs for each topic in queues: +// queues(topic) = queue of (doc topic, doc ID). +val queues = + Array.fill(numTopics)(new BoundedPriorityQueue[(Double, Long)](maxDocumentsPerTopic)) +for ((docId, docTopics) <- docVertices) { + var topic = 0 + while (topic < numTopics) { +queues(topic) += (docTopics(topic) -> docId) +topic += 1 + } +} +Iterator(queues) + }.treeReduce { (q1, q2) => +q1.zip(q2).foreach { case (a, b) => a ++= b } +q1 + } +topicsInQueues.map { q => + val (docTopics, docs) = q.toArray.sortBy(-_._1).unzip + (docs.toArray, docTopics.toArray) +} + } + // TODO // override def logLikelihood(documents: RDD[(Long, Vector)]): Double = ??? http://git-wip-us.apache.org/repos/asf/spark/blob/3c0d2e55/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 79d2a1c..f2b9470 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -122,6 +122,28 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { // Check: log probabilities assert(model.logLikelihood < 0.0) assert(model.logPrior < 0.0) + +// Check: topDocumentsPerTopic +// Compare it with top documents per topic derived from topicDistributions +val topDocsByTopicDistributions = { n: Int => + Range(0, k).map { topic => +val (doc, docWeights) = topicDistributions.sortBy(-_._2(topic)).take(n).unzip +(doc.toArray, docWeights.map(_(topic)).toArray) + }.toArray +} + +// Top 3 documents per topic +model.topDocumentsPerTopic(3).zip(topDocsByTopicDistributions(3)).foreach {case (t1, t2) => + assert(t1
spark git commit: [SPARK-9202] capping maximum number of executor&driver information kept in Worker
Repository: spark Updated Branches: refs/heads/master a8340fa7d -> c0686668a [SPARK-9202] capping maximum number of executor&driver information kept in Worker https://issues.apache.org/jira/browse/SPARK-9202 Author: CodingCat Closes #7714 from CodingCat/SPARK-9202 and squashes the following commits: 23977fb [CodingCat] add comments about why we don't synchronize finishedExecutors & finishedDrivers dc9772d [CodingCat] addressing the comments e125241 [CodingCat] stylistic fix 80bfe52 [CodingCat] fix JsonProtocolSuite d7d9485 [CodingCat] styistic fix and respect insert ordering 031755f [CodingCat] add license info & stylistic fix c3b5361 [CodingCat] test cases and docs c557b3a [CodingCat] applications are fine 9cac751 [CodingCat] application is fine... ad87ed7 [CodingCat] trimFinishedExecutorsAndDrivers Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0686668 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0686668 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0686668 Branch: refs/heads/master Commit: c0686668ae6a92b6bb4801a55c3b78aedbee816a Parents: a8340fa Author: CodingCat Authored: Fri Jul 31 20:27:00 2015 +0100 Committer: Sean Owen Committed: Fri Jul 31 20:27:00 2015 +0100 -- .../org/apache/spark/deploy/worker/Worker.scala | 124 +++-- .../spark/deploy/worker/ui/WorkerWebUI.scala| 4 +- .../apache/spark/deploy/DeployTestUtils.scala | 89 + .../apache/spark/deploy/JsonProtocolSuite.scala | 59 ++-- .../spark/deploy/worker/WorkerSuite.scala | 133 ++- docs/configuration.md | 14 ++ 6 files changed, 329 insertions(+), 94 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c0686668/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 82e9578..0276c24 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -25,7 +25,7 @@ import java.util.concurrent._ import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture} import scala.collection.JavaConversions._ -import scala.collection.mutable.{HashMap, HashSet} +import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap} import scala.concurrent.ExecutionContext import scala.util.Random import scala.util.control.NonFatal @@ -115,13 +115,18 @@ private[worker] class Worker( } var workDir: File = null - val finishedExecutors = new HashMap[String, ExecutorRunner] + val finishedExecutors = new LinkedHashMap[String, ExecutorRunner] val drivers = new HashMap[String, DriverRunner] val executors = new HashMap[String, ExecutorRunner] - val finishedDrivers = new HashMap[String, DriverRunner] + val finishedDrivers = new LinkedHashMap[String, DriverRunner] val appDirectories = new HashMap[String, Seq[String]] val finishedApps = new HashSet[String] + val retainedExecutors = conf.getInt("spark.worker.ui.retainedExecutors", +WorkerWebUI.DEFAULT_RETAINED_EXECUTORS) + val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers", +WorkerWebUI.DEFAULT_RETAINED_DRIVERS) + // The shuffle service is not actually started unless configured. private val shuffleService = new ExternalShuffleService(conf, securityMgr) @@ -461,25 +466,7 @@ private[worker] class Worker( } case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) => - sendToMaster(executorStateChanged) - val fullId = appId + "/" + execId - if (ExecutorState.isFinished(state)) { -executors.get(fullId) match { - case Some(executor) => -logInfo("Executor " + fullId + " finished with state " + state + - message.map(" message " + _).getOrElse("") + - exitStatus.map(" exitStatus " + _).getOrElse("")) -executors -= fullId -finishedExecutors(fullId) = executor -coresUsed -= executor.cores -memoryUsed -= executor.memory - case None => -logInfo("Unknown Executor " + fullId + " finished with state " + state + - message.map(" message " + _).getOrElse("") + - exitStatus.map(" exitStatus " + _).getOrElse("")) -} -maybeCleanupApplication(appId) - } + handleExecutorStateChanged(executorStateChanged) case KillExecutor(masterUrl, appId, execId) => if (masterUrl != activeMasterUrl) { @@ -523,24 +510,8 @@ private[worker] class Worker( } } -
spark git commit: [SPARK-9481] Add logLikelihood to LocalLDAModel
Repository: spark Updated Branches: refs/heads/master d04634701 -> a8340fa7d [SPARK-9481] Add logLikelihood to LocalLDAModel jkbradley Exposes `bound` (variational log likelihood bound) through public API as `logLikelihood`. Also adds unit tests, some DRYing of `LDASuite`, and includes unit tests mentioned in #7760 Author: Feynman Liang Closes #7801 from feynmanliang/SPARK-9481-logLikelihood and squashes the following commits: 6d1b2c9 [Feynman Liang] Negate perplexity definition 5f62b20 [Feynman Liang] Add logLikelihood Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8340fa7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8340fa7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8340fa7 Branch: refs/heads/master Commit: a8340fa7df17e3f0a3658f8b8045ab840845a72a Parents: d046347 Author: Feynman Liang Authored: Fri Jul 31 12:12:22 2015 -0700 Committer: Joseph K. Bradley Committed: Fri Jul 31 12:12:22 2015 -0700 -- .../spark/mllib/clustering/LDAModel.scala | 20 ++- .../spark/mllib/clustering/LDASuite.scala | 129 ++- 2 files changed, 78 insertions(+), 71 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a8340fa7/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 82281a0..ff7035d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -217,22 +217,28 @@ class LocalLDAModel private[clustering] ( LocalLDAModel.SaveLoadV1_0.save(sc, path, topicsMatrix, docConcentration, topicConcentration, gammaShape) } - // TODO - // override def logLikelihood(documents: RDD[(Long, Vector)]): Double = ??? + + // TODO: declare in LDAModel and override once implemented in DistributedLDAModel + /** + * Calculates a lower bound on the log likelihood of the entire corpus. + * @param documents test corpus to use for calculating log likelihood + * @return variational lower bound on the log likelihood of the entire corpus + */ + def logLikelihood(documents: RDD[(Long, Vector)]): Double = bound(documents, +docConcentration, topicConcentration, topicsMatrix.toBreeze.toDenseMatrix, gammaShape, k, +vocabSize) /** - * Calculate the log variational bound on perplexity. See Equation (16) in original Online + * Calculate an upper bound bound on perplexity. See Equation (16) in original Online * LDA paper. * @param documents test corpus to use for calculating perplexity - * @return the log perplexity per word + * @return variational upper bound on log perplexity per word */ def logPerplexity(documents: RDD[(Long, Vector)]): Double = { val corpusWords = documents .map { case (_, termCounts) => termCounts.toArray.sum } .sum() -val batchVariationalBound = bound(documents, docConcentration, - topicConcentration, topicsMatrix.toBreeze.toDenseMatrix, gammaShape, k, vocabSize) -val perWordBound = batchVariationalBound / corpusWords +val perWordBound = -logLikelihood(documents) / corpusWords perWordBound } http://git-wip-us.apache.org/repos/asf/spark/blob/a8340fa7/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 695ee3b..79d2a1c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -210,16 +210,7 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { } test("OnlineLDAOptimizer with toy data") { -def toydata: Array[(Long, Vector)] = Array( - Vectors.sparse(6, Array(0, 1), Array(1, 1)), - Vectors.sparse(6, Array(1, 2), Array(1, 1)), - Vectors.sparse(6, Array(0, 2), Array(1, 1)), - Vectors.sparse(6, Array(3, 4), Array(1, 1)), - Vectors.sparse(6, Array(3, 5), Array(1, 1)), - Vectors.sparse(6, Array(4, 5), Array(1, 1)) -).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) } - -val docs = sc.parallelize(toydata) +val docs = sc.parallelize(toyData) val op = new OnlineLDAOptimizer().setMiniBatchFraction(1).setTau0(1024).setKappa(0.51) .setGammaShape(1e10) val lda = new LDA().setK(2) @@ -242,30 +233,45 @@ class LDASuite extends SparkFunSuite with
spark git commit: [SPARK-9504] [STREAMING] [TESTS] Use eventually to fix the flaky test
Repository: spark Updated Branches: refs/heads/master 3afc1de89 -> d04634701 [SPARK-9504] [STREAMING] [TESTS] Use eventually to fix the flaky test The previous code uses `ssc.awaitTerminationOrTimeout(500)`. Since nobody will stop it during `awaitTerminationOrTimeout`, it's just like `sleep(500)`. In a super overloaded Jenkins worker, the receiver may be not able to start in 500 milliseconds. Verified this in the log of https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39149/ There is no log about starting the receiver before this failure. That's why `assert(runningCount > 0)` failed. This PR replaces `awaitTerminationOrTimeout` with `eventually` which should be more reliable. Author: zsxwing Closes #7823 from zsxwing/SPARK-9504 and squashes the following commits: 7af66a6 [zsxwing] Remove wrong assertion 5ba2c99 [zsxwing] Use eventually to fix the flaky test Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0463470 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0463470 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0463470 Branch: refs/heads/master Commit: d04634701413410938a133358fe1d9fbc077645e Parents: 3afc1de Author: zsxwing Authored: Fri Jul 31 12:10:55 2015 -0700 Committer: Tathagata Das Committed: Fri Jul 31 12:10:55 2015 -0700 -- .../apache/spark/streaming/StreamingContextSuite.scala| 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d0463470/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 84a5fbb..b7db280 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -261,7 +261,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo for (i <- 1 to 4) { logInfo("==\n\n\n") ssc = new StreamingContext(sc, Milliseconds(100)) - var runningCount = 0 + @volatile var runningCount = 0 TestReceiver.counter.set(1) val input = ssc.receiverStream(new TestReceiver) input.count().foreachRDD { rdd => @@ -270,14 +270,14 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo logInfo("Count = " + count + ", Running count = " + runningCount) } ssc.start() - ssc.awaitTerminationOrTimeout(500) + eventually(timeout(10.seconds), interval(10.millis)) { +assert(runningCount > 0) + } ssc.stop(stopSparkContext = false, stopGracefully = true) logInfo("Running count = " + runningCount) logInfo("TestReceiver.counter = " + TestReceiver.counter.get()) - assert(runningCount > 0) assert( -(TestReceiver.counter.get() == runningCount + 1) || - (TestReceiver.counter.get() == runningCount + 2), +TestReceiver.counter.get() == runningCount + 1, "Received records = " + TestReceiver.counter.get() + ", " + "processed records = " + runningCount ) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8564] [STREAMING] Add the Python API for Kinesis
Repository: spark Updated Branches: refs/heads/master 39ab199a3 -> 3afc1de89 [SPARK-8564] [STREAMING] Add the Python API for Kinesis This PR adds the Python API for Kinesis, including a Python example and a simple unit test. Author: zsxwing Closes #6955 from zsxwing/kinesis-python and squashes the following commits: e42e471 [zsxwing] Merge branch 'master' into kinesis-python 455f7ea [zsxwing] Remove streaming_kinesis_asl_assembly module and simply add the source folder to streaming_kinesis_asl module 32e6451 [zsxwing] Merge remote-tracking branch 'origin/master' into kinesis-python 5082d28 [zsxwing] Fix the syntax error for Python 2.6 fca416b [zsxwing] Fix wrong comparison 96670ff [zsxwing] Fix the compilation error after merging master 756a128 [zsxwing] Merge branch 'master' into kinesis-python 6c37395 [zsxwing] Print stack trace for debug 7c5cfb0 [zsxwing] RUN_KINESIS_TESTS -> ENABLE_KINESIS_TESTS cc9d071 [zsxwing] Fix the python test errors 466b425 [zsxwing] Add python tests for Kinesis e33d505 [zsxwing] Merge remote-tracking branch 'origin/master' into kinesis-python 3da2601 [zsxwing] Fix the kinesis folder 687446b [zsxwing] Fix the error message and the maven output path add2beb [zsxwing] Merge branch 'master' into kinesis-python 4957c0b [zsxwing] Add the Python API for Kinesis Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3afc1de8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3afc1de8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3afc1de8 Branch: refs/heads/master Commit: 3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0 Parents: 39ab199 Author: zsxwing Authored: Fri Jul 31 12:09:48 2015 -0700 Committer: Tathagata Das Committed: Fri Jul 31 12:09:48 2015 -0700 -- dev/run-tests.py| 3 +- dev/sparktestsupport/modules.py | 9 +- docs/streaming-kinesis-integration.md | 19 extras/kinesis-asl-assembly/pom.xml | 103 + .../examples/streaming/kinesis_wordcount_asl.py | 81 ++ .../streaming/kinesis/KinesisTestUtils.scala| 19 +++- .../spark/streaming/kinesis/KinesisUtils.scala | 78 ++--- pom.xml | 1 + project/SparkBuild.scala| 6 +- python/pyspark/streaming/kinesis.py | 112 +++ python/pyspark/streaming/tests.py | 86 +- 11 files changed, 492 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3afc1de8/dev/run-tests.py -- diff --git a/dev/run-tests.py b/dev/run-tests.py index 29420da..b6d1814 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -301,7 +301,8 @@ def build_spark_sbt(hadoop_version): sbt_goals = ["package", "assembly/assembly", "streaming-kafka-assembly/assembly", - "streaming-flume-assembly/assembly"] + "streaming-flume-assembly/assembly", + "streaming-kinesis-asl-assembly/assembly"] profiles_and_goals = build_profiles + sbt_goals print("[info] Building Spark (w/Hive 0.13.1) using SBT with these arguments: ", http://git-wip-us.apache.org/repos/asf/spark/blob/3afc1de8/dev/sparktestsupport/modules.py -- diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 44600cb..956dc81 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -138,6 +138,7 @@ streaming_kinesis_asl = Module( dependencies=[], source_file_regexes=[ "extras/kinesis-asl/", +"extras/kinesis-asl-assembly/", ], build_profile_flags=[ "-Pkinesis-asl", @@ -300,7 +301,13 @@ pyspark_sql = Module( pyspark_streaming = Module( name="pyspark-streaming", -dependencies=[pyspark_core, streaming, streaming_kafka, streaming_flume_assembly], +dependencies=[ +pyspark_core, +streaming, +streaming_kafka, +streaming_flume_assembly, +streaming_kinesis_asl +], source_file_regexes=[ "python/pyspark/streaming" ], http://git-wip-us.apache.org/repos/asf/spark/blob/3afc1de8/docs/streaming-kinesis-integration.md -- diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md index aa9749a..a7bcaec 100644 --- a/docs/streaming-kinesis-integration.md +++ b/docs/streaming-kinesis-integration.md @@ -52,6 +52,17 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m and the [example]({{site.SPARK_G
spark git commit: [SPARK-8640] [SQL] Enable Processing of Multiple Window Frames in a Single Window Operator
Repository: spark Updated Branches: refs/heads/master 0a1d2ca42 -> 39ab199a3 [SPARK-8640] [SQL] Enable Processing of Multiple Window Frames in a Single Window Operator This PR enables the processing of multiple window frames in a single window operator. This should improve the performance of processing multiple window expressions wich share partition by/order by clauses, because it will be more efficient with respect to memory use and group processing. Author: Herman van Hovell Closes #7515 from hvanhovell/SPARK-8640 and squashes the following commits: f0e1c21 [Herman van Hovell] Changed Window Logical/Physical plans to use partition by/order by specs directly instead of using WindowSpec. e1711c2 [Herman van Hovell] Enabled the processing of multiple window frames in a single Window operator. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39ab199a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39ab199a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39ab199a Branch: refs/heads/master Commit: 39ab199a3f735b7658ab3331d3e2fb03441aec13 Parents: 0a1d2ca Author: Herman van Hovell Authored: Fri Jul 31 12:07:18 2015 -0700 Committer: Yin Huai Committed: Fri Jul 31 12:08:25 2015 -0700 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 12 +++- .../catalyst/plans/logical/basicOperators.scala | 3 ++- .../spark/sql/execution/SparkStrategies.scala| 5 +++-- .../org/apache/spark/sql/execution/Window.scala | 19 ++- .../spark/sql/hive/execution/HivePlanTest.scala | 18 ++ 5 files changed, 40 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/39ab199a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 265f3d1..51d910b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -347,7 +347,7 @@ class Analyzer( val newOutput = oldVersion.generatorOutput.map(_.newInstance()) (oldVersion, oldVersion.copy(generatorOutput = newOutput)) - case oldVersion @ Window(_, windowExpressions, _, child) + case oldVersion @ Window(_, windowExpressions, _, _, child) if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes) .nonEmpty => (oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions))) @@ -825,7 +825,7 @@ class Analyzer( }.asInstanceOf[NamedExpression] } - // Second, we group extractedWindowExprBuffer based on their Window Spec. + // Second, we group extractedWindowExprBuffer based on their Partition and Order Specs. val groupedWindowExpressions = extractedWindowExprBuffer.groupBy { expr => val distinctWindowSpec = expr.collect { case window: WindowExpression => window.windowSpec @@ -841,7 +841,8 @@ class Analyzer( failAnalysis(s"$expr has multiple Window Specifications ($distinctWindowSpec)." + s"Please file a bug report with this error message, stack trace, and the query.") } else { - distinctWindowSpec.head + val spec = distinctWindowSpec.head + (spec.partitionSpec, spec.orderSpec) } }.toSeq @@ -850,9 +851,10 @@ class Analyzer( var currentChild = child var i = 0 while (i < groupedWindowExpressions.size) { -val (windowSpec, windowExpressions) = groupedWindowExpressions(i) +val ((partitionSpec, orderSpec), windowExpressions) = groupedWindowExpressions(i) // Set currentChild to the newly created Window operator. -currentChild = Window(currentChild.output, windowExpressions, windowSpec, currentChild) +currentChild = Window(currentChild.output, windowExpressions, + partitionSpec, orderSpec, currentChild) // Move to next Window Spec. i += 1 http://git-wip-us.apache.org/repos/asf/spark/blob/39ab199a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index a67f8de..aacfc86 100644 --- a/sql/catalyst/src/main/scala/org/ap
spark git commit: [SPARK-8979] Add a PID based rate estimator
Repository: spark Updated Branches: refs/heads/master e8bdcdeab -> 0a1d2ca42 [SPARK-8979] Add a PID based rate estimator Based on #7600 /cc tdas Author: Iulian Dragos Author: François Garillot Closes #7648 from dragos/topic/streaming-bp/pid and squashes the following commits: aa5b097 [Iulian Dragos] Add more comments, made all PID constant parameters positive, a couple more tests. 93b74f8 [Iulian Dragos] Better explanation of historicalError. 7975b0c [Iulian Dragos] Add configuration for PID. 26cfd78 [Iulian Dragos] A couple of variable renames. d0bdf7c [Iulian Dragos] Update to latest version of the code, various style and name improvements. d58b845 [François Garillot] [SPARK-8979][Streaming] Implements a PIDRateEstimator Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a1d2ca4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a1d2ca4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a1d2ca4 Branch: refs/heads/master Commit: 0a1d2ca42c8b31d6b0e70163795f0185d4622f87 Parents: e8bdcde Author: Iulian Dragos Authored: Fri Jul 31 12:04:03 2015 -0700 Committer: Tathagata Das Committed: Fri Jul 31 12:04:03 2015 -0700 -- .../dstream/ReceiverInputDStream.scala | 2 +- .../scheduler/rate/PIDRateEstimator.scala | 124 + .../scheduler/rate/RateEstimator.scala | 18 ++- .../scheduler/rate/PIDRateEstimatorSuite.scala | 137 +++ 4 files changed, 276 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0a1d2ca4/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 646a8c3..670ef8d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -46,7 +46,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont */ override protected[streaming] val rateController: Option[RateController] = { if (RateController.isBackPressureEnabled(ssc.conf)) { - RateEstimator.create(ssc.conf).map { new ReceiverRateController(id, _) } + Some(new ReceiverRateController(id, RateEstimator.create(ssc.conf, ssc.graph.batchDuration))) } else { None } http://git-wip-us.apache.org/repos/asf/spark/blob/0a1d2ca4/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala new file mode 100644 index 000..6ae56a6 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler.rate + +/** + * Implements a proportional-integral-derivative (PID) controller which acts on + * the speed of ingestion of elements into Spark Streaming. A PID controller works + * by calculating an '''error''' between a measured output and a desired value. In the + * case of Spark Streaming the error is the difference between the measured processing + * rate (number of elements/processing delay) and the previous rate. + * + * @see https://en.wikipedia.org/wiki/PID_controller + * + * @param batchDurationMillis the batch duration, in milliseconds + * @param proportional how much the correction should depend on the current + *error. This term usually provides the bulk of correction and should be positive or zero. + *A value too l
spark git commit: [SPARK-6885] [ML] decision tree support predict class probabilities
Repository: spark Updated Branches: refs/heads/master 4011a9471 -> e8bdcdeab [SPARK-6885] [ML] decision tree support predict class probabilities Decision tree support predict class probabilities. Implement the prediction probabilities function referred the old DecisionTree API and the [sklean API](https://github.com/scikit-learn/scikit-learn/blob/master/sklearn/tree/tree.py#L593). I make the DecisionTreeClassificationModel inherit from ProbabilisticClassificationModel, make the predictRaw to return the raw counts vector and make raw2probabilityInPlace/predictProbability return the probabilities for each prediction. Author: Yanbo Liang Closes #7694 from yanboliang/spark-6885 and squashes the following commits: 08d5b7f [Yanbo Liang] fix ImpurityStats null parameters and raw2probabilityInPlace sum = 0 issue 2174278 [Yanbo Liang] solve merge conflicts 7e90ba8 [Yanbo Liang] fix typos 33ae183 [Yanbo Liang] fix annotation ff043d3 [Yanbo Liang] raw2probabilityInPlace should operate in-place c32d6ce [Yanbo Liang] optimize calculateImpurityStats function again 6167fb0 [Yanbo Liang] optimize calculateImpurityStats function fbbe2ec [Yanbo Liang] eliminate duplicated struct and code beb1634 [Yanbo Liang] try to eliminate impurityStats for each LearningNode 99e8943 [Yanbo Liang] code optimization 5ec3323 [Yanbo Liang] implement InformationGainAndImpurityStats 227c91b [Yanbo Liang] refactor LearningNode to store ImpurityCalculator d746ffc [Yanbo Liang] decision tree support predict class probabilities Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8bdcdea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8bdcdea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8bdcdea Branch: refs/heads/master Commit: e8bdcdeabb2df139a656f86686cdb53c891b1f4b Parents: 4011a94 Author: Yanbo Liang Authored: Fri Jul 31 11:56:52 2015 -0700 Committer: Joseph K. Bradley Committed: Fri Jul 31 11:56:52 2015 -0700 -- .../classification/DecisionTreeClassifier.scala | 40 -- .../spark/ml/classification/GBTClassifier.scala | 2 +- .../classification/RandomForestClassifier.scala | 2 +- .../ml/regression/DecisionTreeRegressor.scala | 2 +- .../spark/ml/regression/GBTRegressor.scala | 2 +- .../ml/regression/RandomForestRegressor.scala | 2 +- .../scala/org/apache/spark/ml/tree/Node.scala | 80 ++-- .../spark/ml/tree/impl/RandomForest.scala | 126 --- .../spark/mllib/tree/impurity/Entropy.scala | 2 +- .../apache/spark/mllib/tree/impurity/Gini.scala | 2 +- .../spark/mllib/tree/impurity/Impurity.scala| 2 +- .../spark/mllib/tree/impurity/Variance.scala| 2 +- .../mllib/tree/model/InformationGainStats.scala | 61 - .../DecisionTreeClassifierSuite.scala | 30 - .../ml/classification/GBTClassifierSuite.scala | 2 +- .../RandomForestClassifierSuite.scala | 2 +- 16 files changed, 229 insertions(+), 130 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e8bdcdea/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala index 36fe1bd..f27cfd0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala @@ -18,12 +18,11 @@ package org.apache.spark.ml.classification import org.apache.spark.annotation.Experimental -import org.apache.spark.ml.{PredictionModel, Predictor} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tree.{DecisionTreeModel, DecisionTreeParams, Node, TreeClassifierParams} import org.apache.spark.ml.tree.impl.RandomForest import org.apache.spark.ml.util.{Identifiable, MetadataUtils} -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy} import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeModel} @@ -39,7 +38,7 @@ import org.apache.spark.sql.DataFrame */ @Experimental final class DecisionTreeClassifier(override val uid: String) - extends Predictor[Vector, DecisionTreeClassifier, DecisionTreeClassificationModel] + extends ProbabilisticClassifier[Vector, DecisionTreeClassifier, DecisionTreeClassificationModel] with DecisionTreeParams with TreeClassifierParams { def this() = this
spark git commit: [SPARK-9231] [MLLIB] DistributedLDAModel method for top topics per document
Repository: spark Updated Branches: refs/heads/master 6add4eddb -> 4011a9471 [SPARK-9231] [MLLIB] DistributedLDAModel method for top topics per document jira: https://issues.apache.org/jira/browse/SPARK-9231 Helper method in DistributedLDAModel of this form: ``` /** * For each document, return the top k weighted topics for that document. * return RDD of (doc ID, topic indices, topic weights) */ def topTopicsPerDocument(k: Int): RDD[(Long, Array[Int], Array[Double])] ``` Author: Yuhao Yang Closes #7785 from hhbyyh/topTopicsPerdoc and squashes the following commits: 30ad153 [Yuhao Yang] small fix fd24580 [Yuhao Yang] add topTopics per document to DistributedLDAModel Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4011a947 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4011a947 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4011a947 Branch: refs/heads/master Commit: 4011a947154d97a9ffb5a71f077481a12534d36b Parents: 6add4ed Author: Yuhao Yang Authored: Fri Jul 31 11:50:15 2015 -0700 Committer: Joseph K. Bradley Committed: Fri Jul 31 11:50:15 2015 -0700 -- .../apache/spark/mllib/clustering/LDAModel.scala | 19 ++- .../apache/spark/mllib/clustering/LDASuite.scala | 13 - 2 files changed, 30 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4011a947/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 6cfad3f..82281a0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.clustering -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, normalize, sum} +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argtopk, normalize, sum} import breeze.numerics.{exp, lgamma} import org.apache.hadoop.fs.Path import org.json4s.DefaultFormats @@ -591,6 +591,23 @@ class DistributedLDAModel private[clustering] ( JavaPairRDD.fromRDD(topicDistributions.asInstanceOf[RDD[(java.lang.Long, Vector)]]) } + /** + * For each document, return the top k weighted topics for that document and their weights. + * @return RDD of (doc ID, topic indices, topic weights) + */ + def topTopicsPerDocument(k: Int): RDD[(Long, Array[Int], Array[Double])] = { +graph.vertices.filter(LDA.isDocumentVertex).map { case (docID, topicCounts) => + val topIndices = argtopk(topicCounts, k) + val sumCounts = sum(topicCounts) + val weights = if (sumCounts != 0) { +topicCounts(topIndices) / sumCounts + } else { +topicCounts(topIndices) + } + (docID.toLong, topIndices.toArray, weights.toArray) +} + } + // TODO: // override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ??? http://git-wip-us.apache.org/repos/asf/spark/blob/4011a947/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index c43e1e5..695ee3b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.clustering -import breeze.linalg.{DenseMatrix => BDM, max, argmax} +import breeze.linalg.{DenseMatrix => BDM, argtopk, max, argmax} import org.apache.spark.SparkFunSuite import org.apache.spark.graphx.Edge @@ -108,6 +108,17 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { assert(topicDistribution.toArray.sum ~== 1.0 absTol 1e-5) } +val top2TopicsPerDoc = model.topTopicsPerDocument(2).map(t => (t._1, (t._2, t._3))) +model.topicDistributions.join(top2TopicsPerDoc).collect().foreach { + case (docId, (topicDistribution, (indices, weights))) => +assert(indices.length == 2) +assert(weights.length == 2) +val bdvTopicDist = topicDistribution.toBreeze +val top2Indices = argtopk(bdvTopicDist, 2) +assert(top2Indices.toArray === indices) +assert(bdvTopicDist(top2Indices).toArray === weights) +} + // Check: log probabilities assert(model.logLikelihood < 0.0) assert(model.logPrior < 0.0) - To unsubscribe,
spark git commit: [SPARK-9471] [ML] Multilayer Perceptron
Repository: spark Updated Branches: refs/heads/master 0024da915 -> 6add4eddb [SPARK-9471] [ML] Multilayer Perceptron This pull request contains the following feature for ML: - Multilayer Perceptron classifier This implementation is based on our initial pull request with bgreeven: https://github.com/apache/spark/pull/1290 and inspired by very insightful suggestions from mengxr and witgo (I would like to thank all other people from the mentioned thread for useful discussions). The original code was extensively tested and benchmarked. Since then, I've addressed two main requirements that prevented the code from merging into the main branch: - Extensible interface, so it will be easy to implement new types of networks - Main building blocks are traits `Layer` and `LayerModel`. They are used for constructing layers of ANN. New layers can be added by extending the `Layer` and `LayerModel` traits. These traits are private in this release in order to save path to improve them based on community feedback - Back propagation is implemented in general form, so there is no need to change it (optimization algorithm) when new layers are implemented - Speed and scalability: this implementation has to be comparable in terms of speed to the state of the art single node implementations. - The developed benchmark for large ANN shows that the proposed code is on par with C++ CPU implementation and scales nicely with the number of workers. Details can be found here: https://github.com/avulanov/ann-benchmark - DBN and RBM by witgo https://github.com/witgo/spark/tree/ann-interface-gemm-dbn - Dropout https://github.com/avulanov/spark/tree/ann-interface-gemm mengxr and dbtsai kindly agreed to perform code review. Author: Alexander Ulanov Author: Bert Greevenbosch Closes #7621 from avulanov/SPARK-2352-ann and squashes the following commits: 4806b6f [Alexander Ulanov] Addressing reviewers comments. a7e7951 [Alexander Ulanov] Default blockSize: 100. Added documentation to blockSize parameter and DataStacker class f69bb3d [Alexander Ulanov] Addressing reviewers comments. 374bea6 [Alexander Ulanov] Moving ANN to ML package. GradientDescent constructor is now spark private. 43b0ae2 [Alexander Ulanov] Addressing reviewers comments. Adding multiclass test. 9d18469 [Alexander Ulanov] Addressing reviewers comments: unnecessary copy of data in predict 35125ab [Alexander Ulanov] Style fix in tests e191301 [Alexander Ulanov] Apache header a226133 [Alexander Ulanov] Multilayer Perceptron regressor and classifier Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6add4edd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6add4edd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6add4edd Branch: refs/heads/master Commit: 6add4eddb39e7748a87da3e921ea3c7881d30a82 Parents: 0024da9 Author: Alexander Ulanov Authored: Fri Jul 31 11:22:40 2015 -0700 Committer: Xiangrui Meng Committed: Fri Jul 31 11:23:30 2015 -0700 -- .../org/apache/spark/ml/ann/BreezeUtil.scala| 63 ++ .../scala/org/apache/spark/ml/ann/Layer.scala | 882 +++ .../MultilayerPerceptronClassifier.scala| 193 .../org/apache/spark/ml/param/params.scala | 5 + .../mllib/optimization/GradientDescent.scala| 2 +- .../org/apache/spark/ml/ann/ANNSuite.scala | 91 ++ .../MultilayerPerceptronClassifierSuite.scala | 91 ++ 7 files changed, 1326 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6add4edd/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala b/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala new file mode 100644 index 000..7429f9d --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apach
spark git commit: [SQL] address comments for to_date/trunc
Repository: spark Updated Branches: refs/heads/master 27ae851ce -> 0024da915 [SQL] address comments for to_date/trunc This PR address the comments in #7805 cc rxin Author: Davies Liu Closes #7817 from davies/trunc and squashes the following commits: f729d5f [Davies Liu] rollback cb7f7832 [Davies Liu] genCode() is protected 31e52ef [Davies Liu] fix style ed1edc7 [Davies Liu] address comments for #7805 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0024da91 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0024da91 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0024da91 Branch: refs/heads/master Commit: 0024da9157ba12ec84883a78441fa6835c1d0042 Parents: 27ae851 Author: Davies Liu Authored: Fri Jul 31 11:07:34 2015 -0700 Committer: Davies Liu Committed: Fri Jul 31 11:07:34 2015 -0700 -- .../sql/catalyst/expressions/datetimeFunctions.scala | 15 --- .../spark/sql/catalyst/util/DateTimeUtils.scala | 3 ++- .../catalyst/expressions/ExpressionEvalHelper.scala | 4 +--- .../main/scala/org/apache/spark/sql/functions.scala | 3 +++ 4 files changed, 14 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0024da91/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala index 6e76133..07dea5b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala @@ -726,15 +726,16 @@ case class TruncDate(date: Expression, format: Expression) override def dataType: DataType = DateType override def prettyName: String = "trunc" - lazy val minItemConst = DateTimeUtils.parseTruncLevel(format.eval().asInstanceOf[UTF8String]) + private lazy val truncLevel: Int = +DateTimeUtils.parseTruncLevel(format.eval().asInstanceOf[UTF8String]) override def eval(input: InternalRow): Any = { -val minItem = if (format.foldable) { - minItemConst +val level = if (format.foldable) { + truncLevel } else { DateTimeUtils.parseTruncLevel(format.eval().asInstanceOf[UTF8String]) } -if (minItem == -1) { +if (level == -1) { // unknown format null } else { @@ -742,7 +743,7 @@ case class TruncDate(date: Expression, format: Expression) if (d == null) { null } else { -DateTimeUtils.truncDate(d.asInstanceOf[Int], minItem) +DateTimeUtils.truncDate(d.asInstanceOf[Int], level) } } } @@ -751,7 +752,7 @@ case class TruncDate(date: Expression, format: Expression) val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") if (format.foldable) { - if (minItemConst == -1) { + if (truncLevel == -1) { s""" boolean ${ev.isNull} = true; ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)}; @@ -763,7 +764,7 @@ case class TruncDate(date: Expression, format: Expression) boolean ${ev.isNull} = ${d.isNull}; ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)}; if (!${ev.isNull}) { -${ev.primitive} = $dtu.truncDate(${d.primitive}, $minItemConst); +${ev.primitive} = $dtu.truncDate(${d.primitive}, $truncLevel); } """ } http://git-wip-us.apache.org/repos/asf/spark/blob/0024da91/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 5a7c25b..032ed8a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -794,7 +794,8 @@ object DateTimeUtils { } else if (level == TRUNC_TO_MONTH) { d - DateTimeUtils.getDayOfMonth(d) + 1 } else { - throw new Exception(s"Invalid trunc level: $level") + // caller make sure that this should never be reached + sys.error(s"Invalid trunc level: $level") } } http://git-wip-us.apache.org/repos/asf/spark/blob/0024da91/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala ---
spark git commit: [SPARK-9446] Clear Active SparkContext in stop() method
Repository: spark Updated Branches: refs/heads/master 04a49edfd -> 27ae851ce [SPARK-9446] Clear Active SparkContext in stop() method In thread 'stopped SparkContext remaining active' on mailing list, Andres observed the following in driver log: ``` 15/07/29 15:17:09 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster has disassociated: 15/07/29 15:17:09 INFO YarnClientSchedulerBackend: Shutting down all executors Exception in thread "Yarn application state monitor" org.apache.spark.SparkException: Error asking standalone scheduler to shut down executors at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:261) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:266) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.stop(YarnClientSchedulerBackend.scala:158) at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:416) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1411) at org.apache.spark.SparkContext.stop(SparkContext.scala:1644) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:139) Caused by: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190)15/07/29 15:17:09 INFO YarnClientSchedulerBackend: Asking each executor to shut down at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:257) ... 6 more ``` Effect of the above exception is that a stopped SparkContext is returned to user since SparkContext.clearActiveContext() is not called. Author: tedyu Closes #7756 from tedyu/master and squashes the following commits: 7339ff2 [tedyu] Move null assignment out of tryLogNonFatalError block 6e02cd9 [tedyu] Use Utils.tryLogNonFatalError to guard resource release f5fb519 [tedyu] Clear Active SparkContext in stop() method using finally Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27ae851c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27ae851c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27ae851c Branch: refs/heads/master Commit: 27ae851ce16082775ffbcb5b8fc6bdbe65dc70fc Parents: 04a49ed Author: tedyu Authored: Fri Jul 31 18:16:55 2015 +0100 Committer: Sean Owen Committed: Fri Jul 31 18:16:55 2015 +0100 -- .../scala/org/apache/spark/SparkContext.scala | 50 +++- 1 file changed, 37 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/27ae851c/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ac6ac6c..2d8aa25 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1689,33 +1689,57 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli Utils.removeShutdownHook(_shutdownHookRef) } -postApplicationEnd() -_ui.foreach(_.stop()) +Utils.tryLogNonFatalError { + postApplicationEnd() +} +Utils.tryLogNonFatalError { + _ui.foreach(_.stop()) +} if (env != null) { - env.metricsSystem.report() + Utils.tryLogNonFatalError { +env.metricsSystem.report() + } } if (metadataCleaner != null) { - metadataCleaner.cancel() + Utils.tryLogNonFatalError { +metadataCleaner.cancel() + } +} +Utils.tryLogNonFatalError { + _cleaner.foreach(_.stop()) +} +Utils.tryLogNonFatalError { + _executorAllocationManager.foreach(_.stop()) } -_cleaner.foreach(_.stop()) -_executorAllocationManager.foreach(_.stop()) if (_dagScheduler != null) { - _dagScheduler.stop()
spark git commit: [SPARK-9446] Clear Active SparkContext in stop() method
Repository: spark Updated Branches: refs/heads/branch-1.4 3d6a9214e -> 5ad9f950c [SPARK-9446] Clear Active SparkContext in stop() method In thread 'stopped SparkContext remaining active' on mailing list, Andres observed the following in driver log: ``` 15/07/29 15:17:09 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster has disassociated: 15/07/29 15:17:09 INFO YarnClientSchedulerBackend: Shutting down all executors Exception in thread "Yarn application state monitor" org.apache.spark.SparkException: Error asking standalone scheduler to shut down executors at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:261) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:266) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.stop(YarnClientSchedulerBackend.scala:158) at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:416) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1411) at org.apache.spark.SparkContext.stop(SparkContext.scala:1644) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:139) Caused by: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190)15/07/29 15:17:09 INFO YarnClientSchedulerBackend: Asking each executor to shut down at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:257) ... 6 more ``` Effect of the above exception is that a stopped SparkContext is returned to user since SparkContext.clearActiveContext() is not called. Author: tedyu Closes #7756 from tedyu/master and squashes the following commits: 7339ff2 [tedyu] Move null assignment out of tryLogNonFatalError block 6e02cd9 [tedyu] Use Utils.tryLogNonFatalError to guard resource release f5fb519 [tedyu] Clear Active SparkContext in stop() method using finally (cherry picked from commit 27ae851ce16082775ffbcb5b8fc6bdbe65dc70fc) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ad9f950 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ad9f950 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ad9f950 Branch: refs/heads/branch-1.4 Commit: 5ad9f950c4bd0042d79cdccb5277c10f8412be85 Parents: 3d6a921 Author: tedyu Authored: Fri Jul 31 18:16:55 2015 +0100 Committer: Sean Owen Committed: Fri Jul 31 18:17:05 2015 +0100 -- .../scala/org/apache/spark/SparkContext.scala | 50 +++- 1 file changed, 37 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5ad9f950/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d499aba..3caae87 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1630,33 +1630,57 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli Utils.removeShutdownHook(_shutdownHookRef) } -postApplicationEnd() -_ui.foreach(_.stop()) +Utils.tryLogNonFatalError { + postApplicationEnd() +} +Utils.tryLogNonFatalError { + _ui.foreach(_.stop()) +} if (env != null) { - env.metricsSystem.report() + Utils.tryLogNonFatalError { +env.metricsSystem.report() + } } if (metadataCleaner != null) { - metadataCleaner.cancel() + Utils.tryLogNonFatalError { +metadataCleaner.cancel() + } +} +Utils.tryLogNonFatalError { + _cleaner.foreach(_.stop()) +} +Utils.tryLogNonFatalError { + _executorAllocationManager.foreach(_.stop()) } -_cleaner.foreach(_.stop()) -_ex
spark git commit: [SPARK-9497] [SPARK-9509] [CORE] Use ask instead of askWithRetry
Repository: spark Updated Branches: refs/heads/master fc0e57e5a -> 04a49edfd [SPARK-9497] [SPARK-9509] [CORE] Use ask instead of askWithRetry `RpcEndpointRef.askWithRetry` throws `SparkException` rather than `TimeoutException`. Use ask to replace it because we don't need to retry here. Author: zsxwing Closes #7824 from zsxwing/SPARK-9497 and squashes the following commits: 7bfc2b4 [zsxwing] Use ask instead of askWithRetry Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04a49edf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04a49edf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04a49edf Branch: refs/heads/master Commit: 04a49edfdb606c01fa4f8ae6e730ec4f9bd0cb6d Parents: fc0e57e Author: zsxwing Authored: Fri Jul 31 09:34:10 2015 -0700 Committer: Kay Ousterhout Committed: Fri Jul 31 09:34:16 2015 -0700 -- .../main/scala/org/apache/spark/deploy/client/AppClient.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/04a49edf/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 79b251e..a659abf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -27,7 +27,7 @@ import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master import org.apache.spark.rpc._ -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils} /** * Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL, @@ -248,7 +248,8 @@ private[spark] class AppClient( def stop() { if (endpoint != null) { try { -endpoint.askWithRetry[Boolean](StopAppClient) +val timeout = RpcUtils.askRpcTimeout(conf) +timeout.awaitResult(endpoint.ask[Boolean](StopAppClient)) } catch { case e: TimeoutException => logInfo("Stop request to Master timed out; it may already be shut down.") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9053] [SPARKR] Fix spaces around parens, infix operators etc.
Repository: spark Updated Branches: refs/heads/master 6bba7509a -> fc0e57e5a [SPARK-9053] [SPARKR] Fix spaces around parens, infix operators etc. ### JIRA [[SPARK-9053] Fix spaces around parens, infix operators etc. - ASF JIRA](https://issues.apache.org/jira/browse/SPARK-9053) ### The Result of `lint-r` [The result of lint-r at the rivision:a4c83cb1e4b066cd60264b6572fd3e51d160d26a](https://gist.github.com/yu-iskw/d253d7f8ef351f86443d) Author: Yu ISHIKAWA Closes #7584 from yu-iskw/SPARK-9053 and squashes the following commits: 613170f [Yu ISHIKAWA] Ignore a warning about a space before a left parentheses ede61e1 [Yu ISHIKAWA] Ignores two warnings about a space before a left parentheses. TODO: After updating `lintr`, we will remove the ignores de3e0db [Yu ISHIKAWA] Add '## nolint start' & '## nolint end' statement to ignore infix space warnings e233ea8 [Yu ISHIKAWA] [SPARK-9053][SparkR] Fix spaces around parens, infix operators etc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc0e57e5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc0e57e5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc0e57e5 Branch: refs/heads/master Commit: fc0e57e5aba82a3f227fef05a843283e2ec893fc Parents: 6bba750 Author: Yu ISHIKAWA Authored: Fri Jul 31 09:33:38 2015 -0700 Committer: Shivaram Venkataraman Committed: Fri Jul 31 09:33:38 2015 -0700 -- R/pkg/R/DataFrame.R | 4 R/pkg/R/RDD.R | 7 +-- R/pkg/R/column.R| 2 +- R/pkg/R/context.R | 2 +- R/pkg/R/pairRDD.R | 2 +- R/pkg/R/utils.R | 4 ++-- R/pkg/inst/tests/test_binary_function.R | 2 +- R/pkg/inst/tests/test_rdd.R | 6 +++--- R/pkg/inst/tests/test_sparkSQL.R| 4 +++- 9 files changed, 21 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fc0e57e5/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index f4c93d3..b31ad37 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1322,9 +1322,11 @@ setMethod("write.df", "org.apache.spark.sql.parquet") } allModes <- c("append", "overwrite", "error", "ignore") +# nolint start if (!(mode %in% allModes)) { stop('mode should be one of "append", "overwrite", "error", "ignore"') } +# nolint end jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode) options <- varargsToEnv(...) if (!is.null(path)) { @@ -1384,9 +1386,11 @@ setMethod("saveAsTable", "org.apache.spark.sql.parquet") } allModes <- c("append", "overwrite", "error", "ignore") +# nolint start if (!(mode %in% allModes)) { stop('mode should be one of "append", "overwrite", "error", "ignore"') } +# nolint end jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode) options <- varargsToEnv(...) callJMethod(df@sdf, "saveAsTable", tableName, source, jmode, options) http://git-wip-us.apache.org/repos/asf/spark/blob/fc0e57e5/R/pkg/R/RDD.R -- diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index d2d0967..2a013b3 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -85,7 +85,9 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) isPipelinable <- function(rdd) { e <- rdd@env +# nolint start !(e$isCached || e$isCheckpointed) +# nolint end } if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) { @@ -97,7 +99,8 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) # prev_serializedMode is used during the delayed computation of JRDD in getJRDD } else { pipelinedFunc <- function(partIndex, part) { - func(partIndex, prev@func(partIndex, part)) + f <- prev@func + func(partIndex, f(partIndex, part)) } .Object@func <- cleanClosure(pipelinedFunc) .Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline @@ -841,7 +844,7 @@ setMethod("sampleRDD", if (withReplacement) { count <- rpois(1, fraction) if (count > 0) { -res[(len + 1):(len + count)] <- rep(list(elem), count) +res[ (len + 1) : (len + count) ] <- rep(list(elem), count) len <- len + count }
spark git commit: [SPARK-9500] add TernaryExpression to simplify ternary expressions
Repository: spark Updated Branches: refs/heads/master a3a85d73d -> 6bba7509a [SPARK-9500] add TernaryExpression to simplify ternary expressions There lots of duplicated code in ternary expressions, create a TernaryExpression for them to reduce duplicated code. cc chenghao-intel Author: Davies Liu Closes #7816 from davies/ternary and squashes the following commits: ed2bf76 [Davies Liu] add TernaryExpression Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6bba7509 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6bba7509 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6bba7509 Branch: refs/heads/master Commit: 6bba7509a932aa4d39266df2d15b1370b7aabbec Parents: a3a85d7 Author: Davies Liu Authored: Fri Jul 31 08:28:05 2015 -0700 Committer: Davies Liu Committed: Fri Jul 31 08:28:05 2015 -0700 -- .../sql/catalyst/expressions/Expression.scala | 85 + .../expressions/codegen/CodeGenerator.scala | 2 +- .../spark/sql/catalyst/expressions/math.scala | 66 +--- .../catalyst/expressions/stringOperations.scala | 356 +-- 4 files changed, 183 insertions(+), 326 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6bba7509/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 8fc1826..2842b3e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -432,3 +432,88 @@ abstract class BinaryOperator extends BinaryExpression with ExpectsInputTypes { private[sql] object BinaryOperator { def unapply(e: BinaryOperator): Option[(Expression, Expression)] = Some((e.left, e.right)) } + +/** + * An expression with three inputs and one output. The output is by default evaluated to null + * if any input is evaluated to null. + */ +abstract class TernaryExpression extends Expression { + + override def foldable: Boolean = children.forall(_.foldable) + + override def nullable: Boolean = children.exists(_.nullable) + + /** + * Default behavior of evaluation according to the default nullability of BinaryExpression. + * If subclass of BinaryExpression override nullable, probably should also override this. + */ + override def eval(input: InternalRow): Any = { +val exprs = children +val value1 = exprs(0).eval(input) +if (value1 != null) { + val value2 = exprs(1).eval(input) + if (value2 != null) { +val value3 = exprs(2).eval(input) +if (value3 != null) { + return nullSafeEval(value1, value2, value3) +} + } +} +null + } + + /** + * Called by default [[eval]] implementation. If subclass of BinaryExpression keep the default + * nullability, they can override this method to save null-check code. If we need full control + * of evaluation process, we should override [[eval]]. + */ + protected def nullSafeEval(input1: Any, input2: Any, input3: Any): Any = +sys.error(s"BinaryExpressions must override either eval or nullSafeEval") + + /** + * Short hand for generating binary evaluation code. + * If either of the sub-expressions is null, the result of this computation + * is assumed to be null. + * + * @param f accepts two variable names and returns Java code to compute the output. + */ + protected def defineCodeGen( +ctx: CodeGenContext, +ev: GeneratedExpressionCode, +f: (String, String, String) => String): String = { +nullSafeCodeGen(ctx, ev, (eval1, eval2, eval3) => { + s"${ev.primitive} = ${f(eval1, eval2, eval3)};" +}) + } + + /** + * Short hand for generating binary evaluation code. + * If either of the sub-expressions is null, the result of this computation + * is assumed to be null. + * + * @param f function that accepts the 2 non-null evaluation result names of children + * and returns Java code to compute the output. + */ + protected def nullSafeCodeGen( +ctx: CodeGenContext, +ev: GeneratedExpressionCode, +f: (String, String, String) => String): String = { +val evals = children.map(_.gen(ctx)) +val resultCode = f(evals(0).primitive, evals(1).primitive, evals(2).primitive) +s""" + ${evals(0).code} + boolean ${ev.isNull} = true; + ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)}; + if (!${evals(0).isNull}) { +${evals(1).code} +if (!${evals(1).isNull}) { + ${evals(2)