spark git commit: [SPARK-9010] [DOCUMENTATION] Improve the Spark Configuration document about `spark.kryoserializer.buffer`
Repository: spark Updated Branches: refs/heads/master 20c1434a8 - c1feebd8f [SPARK-9010] [DOCUMENTATION] Improve the Spark Configuration document about `spark.kryoserializer.buffer` The meaning of spark.kryoserializer.buffer should be Initial size of Kryo's serialization buffer. Note that there will be one buffer per core on each worker. This buffer will grow up to spark.kryoserializer.buffer.max if needed.. The spark.kryoserializer.buffer.max.mb is out-of-date in spark 1.4. Author: zhaishidan zhaishi...@haizhi.com Closes #7393 from stanzhai/master and squashes the following commits: 69729ef [zhaishidan] fix document error about spark.kryoserializer.buffer.max.mb Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1feebd8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1feebd8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1feebd8 Branch: refs/heads/master Commit: c1feebd8fcba985667db8ccdafd2b5ec76dcfae7 Parents: 20c1434 Author: zhaishidan zhaishi...@haizhi.com Authored: Tue Jul 14 08:54:30 2015 +0100 Committer: Sean Owen so...@cloudera.com Committed: Tue Jul 14 08:54:30 2015 +0100 -- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c1feebd8/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index 443322e..8a186ee 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -665,7 +665,7 @@ Apart from these, the following properties are also available, and may be useful td Initial size of Kryo's serialization buffer. Note that there will be one buffer iper core/i on each worker. This buffer will grow up to - codespark.kryoserializer.buffer.max.mb/code if needed. + codespark.kryoserializer.buffer.max/code if needed. /td /tr tr - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9010] [DOCUMENTATION] Improve the Spark Configuration document about `spark.kryoserializer.buffer`
Repository: spark Updated Branches: refs/heads/branch-1.4 50607eca5 - dce68ad1a [SPARK-9010] [DOCUMENTATION] Improve the Spark Configuration document about `spark.kryoserializer.buffer` The meaning of spark.kryoserializer.buffer should be Initial size of Kryo's serialization buffer. Note that there will be one buffer per core on each worker. This buffer will grow up to spark.kryoserializer.buffer.max if needed.. The spark.kryoserializer.buffer.max.mb is out-of-date in spark 1.4. Author: zhaishidan zhaishi...@haizhi.com Closes #7393 from stanzhai/master and squashes the following commits: 69729ef [zhaishidan] fix document error about spark.kryoserializer.buffer.max.mb (cherry picked from commit c1feebd8fcba985667db8ccdafd2b5ec76dcfae7) Signed-off-by: Sean Owen so...@cloudera.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dce68ad1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dce68ad1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dce68ad1 Branch: refs/heads/branch-1.4 Commit: dce68ad1a0da5580179d1300d4262b9648babcda Parents: 50607ec Author: zhaishidan zhaishi...@haizhi.com Authored: Tue Jul 14 08:54:30 2015 +0100 Committer: Sean Owen so...@cloudera.com Committed: Tue Jul 14 08:54:59 2015 +0100 -- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dce68ad1/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index 19f3b7e..e60b0f5 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -665,7 +665,7 @@ Apart from these, the following properties are also available, and may be useful td Initial size of Kryo's serialization buffer. Note that there will be one buffer iper core/i on each worker. This buffer will grow up to - codespark.kryoserializer.buffer.max.mb/code if needed. + codespark.kryoserializer.buffer.max/code if needed. /td /tr tr - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9001] Fixing errors in javadocs that lead to failed build/sbt doc
Repository: spark Updated Branches: refs/heads/master 408b384de - 20c1434a8 [SPARK-9001] Fixing errors in javadocs that lead to failed build/sbt doc These are minor corrections in the documentation of several classes that are preventing: ```bash build/sbt publish-local ``` I believe this might be an issue associated with running JDK8 as ankurdave does not appear to have this issue in JDK7. Author: Joseph Gonzalez joseph.e.gonza...@gmail.com Closes #7354 from jegonzal/FixingJavadocErrors and squashes the following commits: 6664b7e [Joseph Gonzalez] making requested changes 2e16d89 [Joseph Gonzalez] Fixing errors in javadocs that prevents build/sbt publish-local from completing. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20c1434a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20c1434a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20c1434a Branch: refs/heads/master Commit: 20c1434a8dbb25b98f6b434b158ae88e44ce3057 Parents: 408b384 Author: Joseph Gonzalez joseph.e.gonza...@gmail.com Authored: Tue Jul 14 00:32:29 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Tue Jul 14 00:32:29 2015 -0700 -- .../java/org/apache/spark/launcher/SparkLauncher.java | 5 +++-- .../main/java/org/apache/spark/launcher/package-info.java | 10 +++--- .../main/java/org/apache/spark/unsafe/bitset/BitSet.java | 2 +- .../org/apache/spark/unsafe/bitset/BitSetMethods.java | 2 +- .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 6 +- .../java/org/apache/spark/unsafe/types/UTF8String.java| 8 6 files changed, 21 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/20c1434a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java -- diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java index d4cfeac..c0f89c9 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java @@ -25,11 +25,12 @@ import java.util.Map; import static org.apache.spark.launcher.CommandBuilderUtils.*; -/** +/** * Launcher for Spark applications. - * p/ + * p * Use this class to start Spark applications programmatically. The class uses a builder pattern * to allow clients to configure the Spark application and launch it as a child process. + * /p */ public class SparkLauncher { http://git-wip-us.apache.org/repos/asf/spark/blob/20c1434a/launcher/src/main/java/org/apache/spark/launcher/package-info.java -- diff --git a/launcher/src/main/java/org/apache/spark/launcher/package-info.java b/launcher/src/main/java/org/apache/spark/launcher/package-info.java index 7ed756f..7c97dba 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/package-info.java +++ b/launcher/src/main/java/org/apache/spark/launcher/package-info.java @@ -17,13 +17,17 @@ /** * Library for launching Spark applications. - * p/ + * + * p * This library allows applications to launch Spark programmatically. There's only one entry * point to the library - the {@link org.apache.spark.launcher.SparkLauncher} class. - * p/ + * /p + * + * p * To launch a Spark application, just instantiate a {@link org.apache.spark.launcher.SparkLauncher} * and configure the application to run. For example: - * + * /p + * * pre * {@code * import org.apache.spark.launcher.SparkLauncher; http://git-wip-us.apache.org/repos/asf/spark/blob/20c1434a/unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSet.java -- diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSet.java b/unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSet.java index 28e23da..7c12417 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSet.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSet.java @@ -90,7 +90,7 @@ public final class BitSet { * To iterate over the true bits in a BitSet, use the following loop: * pre * code - * for (long i = bs.nextSetBit(0); i = 0; i = bs.nextSetBit(i + 1)) { + * for (long i = bs.nextSetBit(0); i gt;= 0; i = bs.nextSetBit(i + 1)) { *// operate on index i here * } * /code http://git-wip-us.apache.org/repos/asf/spark/blob/20c1434a/unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSetMethods.java -- diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSetMethods.java
spark git commit: [SPARK-6851] [SQL] function least/greatest follow up
Repository: spark Updated Branches: refs/heads/master c1feebd8f - 257236c3e [SPARK-6851] [SQL] function least/greatest follow up This is a follow up of remaining comments from #6851 Author: Daoyuan Wang daoyuan.w...@intel.com Closes #7387 from adrian-wang/udflgfollow and squashes the following commits: 6163e62 [Daoyuan Wang] add skipping null values e8c2e09 [Daoyuan Wang] use seq 8362966 [Daoyuan Wang] pr6851 follow up Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/257236c3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/257236c3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/257236c3 Branch: refs/heads/master Commit: 257236c3e17906098f801cbc2059e7a9054e8cab Parents: c1feebd Author: Daoyuan Wang daoyuan.w...@intel.com Authored: Tue Jul 14 01:09:33 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Tue Jul 14 01:09:33 2015 -0700 -- .../sql/catalyst/expressions/conditionals.scala | 16 +++- .../ConditionalExpressionSuite.scala| 79 ++-- .../scala/org/apache/spark/sql/functions.scala | 16 ++-- 3 files changed, 62 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/257236c3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala index 84c28c2..eea7706 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala @@ -311,7 +311,11 @@ case class CaseKeyWhen(key: Expression, branches: Seq[Expression]) extends CaseW } } -case class Least(children: Expression*) extends Expression { +/** + * A function that returns the least value of all parameters, skipping null values. + * It takes at least 2 parameters, and returns null iff all parameters are null. + */ +case class Least(children: Seq[Expression]) extends Expression { require(children.length 1, LEAST requires at least 2 arguments, got + children.length) override def nullable: Boolean = children.forall(_.nullable) @@ -356,12 +360,16 @@ case class Least(children: Expression*) extends Expression { ${evalChildren.map(_.code).mkString(\n)} boolean ${ev.isNull} = true; ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)}; - ${(0 until children.length).map(updateEval).mkString(\n)} + ${children.indices.map(updateEval).mkString(\n)} } } -case class Greatest(children: Expression*) extends Expression { +/** + * A function that returns the greatest value of all parameters, skipping null values. + * It takes at least 2 parameters, and returns null iff all parameters are null. + */ +case class Greatest(children: Seq[Expression]) extends Expression { require(children.length 1, GREATEST requires at least 2 arguments, got + children.length) override def nullable: Boolean = children.forall(_.nullable) @@ -406,7 +414,7 @@ case class Greatest(children: Expression*) extends Expression { ${evalChildren.map(_.code).mkString(\n)} boolean ${ev.isNull} = true; ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)}; - ${(0 until children.length).map(updateEval).mkString(\n)} + ${children.indices.map(updateEval).mkString(\n)} } } http://git-wip-us.apache.org/repos/asf/spark/blob/257236c3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConditionalExpressionSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConditionalExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConditionalExpressionSuite.scala index adadc8c..afa143b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConditionalExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConditionalExpressionSuite.scala @@ -144,35 +144,35 @@ class ConditionalExpressionSuite extends SparkFunSuite with ExpressionEvalHelper val c3 = 'a.string.at(2) val c4 = 'a.string.at(3) val c5 = 'a.string.at(4) -checkEvaluation(Least(c4, c3, c5), a, row) -checkEvaluation(Least(c1, c2), 1, row) -checkEvaluation(Least(c1, c2, Literal(-1)), -1, row) -checkEvaluation(Least(c4, c5, c3, c3, Literal(a)), a, row) - -checkEvaluation(Least(Literal(null), Literal(null)), null,
spark git commit: [SPARK-9029] [SQL] shortcut CaseKeyWhen if key is null
Repository: spark Updated Branches: refs/heads/master 257236c3e - 59d820aa8 [SPARK-9029] [SQL] shortcut CaseKeyWhen if key is null Author: Wenchen Fan cloud0...@outlook.com Closes #7389 from cloud-fan/case-when and squashes the following commits: ea4b6ba [Wenchen Fan] shortcut for case key when Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59d820aa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59d820aa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59d820aa Branch: refs/heads/master Commit: 59d820aa8dec08b744971237860b4c6bef577ddf Parents: 257236c Author: Wenchen Fan cloud0...@outlook.com Authored: Tue Jul 14 10:20:15 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Tue Jul 14 10:20:15 2015 -0700 -- .../sql/catalyst/expressions/conditionals.scala | 48 ++-- 1 file changed, 24 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/59d820aa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala index eea7706..c7f039e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala @@ -230,24 +230,31 @@ case class CaseKeyWhen(key: Expression, branches: Seq[Expression]) extends CaseW } } + private def evalElse(input: InternalRow): Any = { +if (branchesArr.length % 2 == 0) { + null +} else { + branchesArr(branchesArr.length - 1).eval(input) +} + } + /** Written in imperative fashion for performance considerations. */ override def eval(input: InternalRow): Any = { val evaluatedKey = key.eval(input) -val len = branchesArr.length -var i = 0 -// If all branches fail and an elseVal is not provided, the whole statement -// defaults to null, according to Hive's semantics. -while (i len - 1) { - if (threeValueEquals(evaluatedKey, branchesArr(i).eval(input))) { -return branchesArr(i + 1).eval(input) +// If key is null, we can just return the else part or null if there is no else. +// If key is not null but doesn't match any when part, we need to return +// the else part or null if there is no else, according to Hive's semantics. +if (evaluatedKey != null) { + val len = branchesArr.length + var i = 0 + while (i len - 1) { +if (evaluatedKey == branchesArr(i).eval(input)) { + return branchesArr(i + 1).eval(input) +} +i += 2 } - i += 2 } -var res: Any = null -if (i == len - 1) { - res = branchesArr(i).eval(input) -} -return res +evalElse(input) } override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { @@ -261,8 +268,7 @@ case class CaseKeyWhen(key: Expression, branches: Seq[Expression]) extends CaseW s if (!$got) { ${cond.code} - if (!${keyEval.isNull} !${cond.isNull} - ${ctx.genEqual(key.dataType, keyEval.primitive, cond.primitive)}) { + if (!${cond.isNull} ${ctx.genEqual(key.dataType, keyEval.primitive, cond.primitive)}) { $got = true; ${res.code} ${ev.isNull} = ${res.isNull}; @@ -290,19 +296,13 @@ case class CaseKeyWhen(key: Expression, branches: Seq[Expression]) extends CaseW boolean ${ev.isNull} = true; ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)}; ${keyEval.code} - $cases + if (!${keyEval.isNull}) { +$cases + } $other } - private def threeValueEquals(l: Any, r: Any) = { -if (l == null || r == null) { - false -} else { - l == r -} - } - override def toString: String = { sCASE $key + branches.sliding(2, 2).map { case Seq(cond, value) = s WHEN $cond THEN $value - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9027] [SQL] Generalize metastore predicate pushdown
Repository: spark Updated Branches: refs/heads/master 59d820aa8 - 37f2d9635 [SPARK-9027] [SQL] Generalize metastore predicate pushdown Add support for pushing down metastore filters that are in different orders and add some unit tests. Author: Michael Armbrust mich...@databricks.com Closes #7386 from marmbrus/metastoreFilters and squashes the following commits: 05a4524 [Michael Armbrust] [SPARK-9027][SQL] Generalize metastore predicate pushdown Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37f2d963 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37f2d963 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37f2d963 Branch: refs/heads/master Commit: 37f2d9635ff874fb8ad9d246e49faf6098d501c3 Parents: 59d820a Author: Michael Armbrust mich...@databricks.com Authored: Tue Jul 14 11:22:09 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Tue Jul 14 11:22:09 2015 -0700 -- .../apache/spark/sql/hive/client/HiveShim.scala | 54 +++--- .../spark/sql/hive/client/FiltersSuite.scala| 78 2 files changed, 107 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/37f2d963/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 5542a52..d12778c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde.serdeConstants import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.expressions.{Expression, AttributeReference, BinaryComparison} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{StringType, IntegralType} /** @@ -312,37 +312,41 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] = getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].toSeq - override def getPartitionsByFilter( - hive: Hive, - table: Table, - predicates: Seq[Expression]): Seq[Partition] = { + /** + * Converts catalyst expression to the format that Hive's getPartitionsByFilter() expects, i.e. + * a string that represents partition predicates like str_key=\value\ and int_key=1 + * + * Unsupported predicates are skipped. + */ + def convertFilters(table: Table, filters: Seq[Expression]): String = { // hive varchar is treated as catalyst string, but hive varchar can't be pushed down. val varcharKeys = table.getPartitionKeys .filter(col = col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME)) .map(col = col.getName).toSet -// Hive getPartitionsByFilter() takes a string that represents partition -// predicates like str_key=\value\ and int_key=1 ... -val filter = predicates.flatMap { expr = - expr match { -case op @ BinaryComparison(lhs, rhs) = { - lhs match { -case AttributeReference(_, _, _, _) = { - rhs.dataType match { -case _: IntegralType = - Some(lhs.prettyString + op.symbol + rhs.prettyString) -case _: StringType if (!varcharKeys.contains(lhs.prettyString)) = - Some(lhs.prettyString + op.symbol + \ + rhs.prettyString + \) -case _ = None - } -} -case _ = None - } -} -case _ = None - } +filters.collect { + case op @ BinaryComparison(a: Attribute, Literal(v, _: IntegralType)) = +s${a.name} ${op.symbol} $v + case op @ BinaryComparison(Literal(v, _: IntegralType), a: Attribute) = +s$v ${op.symbol} ${a.name} + + case op @ BinaryComparison(a: Attribute, Literal(v, _: StringType)) + if !varcharKeys.contains(a.name) = +s${a.name} ${op.symbol} $v + case op @ BinaryComparison(Literal(v, _: StringType), a: Attribute) + if !varcharKeys.contains(a.name) = +s$v ${op.symbol} ${a.name} }.mkString( and ) + } + + override def getPartitionsByFilter( + hive: Hive, + table: Table, + predicates: Seq[Expression]): Seq[Partition] = { +// Hive getPartitionsByFilter() takes a string that represents partition +// predicates like str_key=\value\ and int_key=1 ... +val filter = convertFilters(table, predicates) val partitions = if (filter.isEmpty) {
spark git commit: [SPARK-8718] [GRAPHX] Improve EdgePartition2D for non perfect square number of partitions
Repository: spark Updated Branches: refs/heads/master d267c2834 - 0a4071eab [SPARK-8718] [GRAPHX] Improve EdgePartition2D for non perfect square number of partitions See https://github.com/aray/e2d/blob/master/EdgePartition2D.ipynb Author: Andrew Ray ray.and...@gmail.com Closes #7104 from aray/edge-partition-2d-improvement and squashes the following commits: 3729f84 [Andrew Ray] correct bounds and remove unneeded comments 97f8464 [Andrew Ray] change less 5141ab4 [Andrew Ray] Merge branch 'master' into edge-partition-2d-improvement 925fd2c [Andrew Ray] use new interface for partitioning 001bfd0 [Andrew Ray] Refactor PartitionStrategy so that we can return a prtition function for a given number of parts. To keep compatibility we define default methods that translate between the two implementation options. Made EdgePartition2D use old strategy when we have a perfect square and implement new interface. 5d42105 [Andrew Ray] % - / 3560084 [Andrew Ray] Merge branch 'master' into edge-partition-2d-improvement f006364 [Andrew Ray] remove unneeded comments cfa2c5e [Andrew Ray] Modifications to EdgePartition2D so that it works for non perfect squares. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a4071ea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a4071ea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a4071ea Branch: refs/heads/master Commit: 0a4071eab30db1db80f61ed2cb2e7243291183ce Parents: d267c28 Author: Andrew Ray ray.and...@gmail.com Authored: Tue Jul 14 13:14:47 2015 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Jul 14 13:14:47 2015 -0700 -- .../apache/spark/graphx/PartitionStrategy.scala | 32 +--- 1 file changed, 21 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0a4071ea/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index 7372dfb..70a7592 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -32,7 +32,7 @@ trait PartitionStrategy extends Serializable { object PartitionStrategy { /** * Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix, - * guaranteeing a `2 * sqrt(numParts) - 1` bound on vertex replication. + * guaranteeing a `2 * sqrt(numParts)` bound on vertex replication. * * Suppose we have a graph with 12 vertices that we want to partition * over 9 machines. We can use the following sparse matrix representation: @@ -61,26 +61,36 @@ object PartitionStrategy { * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, * P6)` or the last * row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be - * replicated to at most `2 * sqrt(numParts) - 1` machines. + * replicated to at most `2 * sqrt(numParts)` machines. * * Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work * balance. To improve balance we first multiply each vertex id by a large prime to shuffle the * vertex locations. * - * One of the limitations of this approach is that the number of machines must either be a - * perfect square. We partially address this limitation by computing the machine assignment to - * the next - * largest perfect square and then mapping back down to the actual number of machines. - * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect - * square is used. + * When the number of partitions requested is not a perfect square we use a slightly different + * method where the last column can have a different number of rows than the others while still + * maintaining the same size per block. */ case object EdgePartition2D extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt val mixingPrime: VertexId = 1125899906842597L - val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt - val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt - (col * ceilSqrtNumParts + row) % numParts + if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) { +// Use old method for perfect squared to ensure we get same results +val col: PartitionID = (math.abs(src *
spark git commit: [SPARK-4072] [CORE] Display Streaming blocks in Streaming UI
Repository: spark Updated Branches: refs/heads/master 0a4071eab - fb1d06fc2 [SPARK-4072] [CORE] Display Streaming blocks in Streaming UI Replace #6634 This PR adds `SparkListenerBlockUpdated` to SparkListener so that it can monitor all block update infos that are sent to `BlockManagerMasaterEndpoint`, and also add new tables in the Storage tab to display the stream block infos. ![screen shot 2015-07-01 at 5 19 46 pm](https://cloud.githubusercontent.com/assets/1000778/8451562/c291a6ec-2016-11e5-890d-0afc174e1f8c.png) Author: zsxwing zsxw...@gmail.com Closes #6672 from zsxwing/SPARK-4072-2 and squashes the following commits: df2c1d8 [zsxwing] Use xml query to check the xml elements 54d54af [zsxwing] Add unit tests for StoragePage e29fb53 [zsxwing] Update as per TD's comments ccbee07 [zsxwing] Fix the code style 6dc42b4 [zsxwing] Fix the replication level of blocks 450fad1 [zsxwing] Merge branch 'master' into SPARK-4072-2 1e9ef52 [zsxwing] Don't categorize by Executor ID ca0ab69 [zsxwing] Fix the code style 3de2762 [zsxwing] Make object BlockUpdatedInfo private e95b594 [zsxwing] Add 'Aggregated Stream Block Metrics by Executor' table ba5d0d1 [zsxwing] Refactor the unit test to improve the readability 4bbe341 [zsxwing] Revert JsonProtocol and don't log SparkListenerBlockUpdated b464dd1 [zsxwing] Add onBlockUpdated to EventLoggingListener 5ba014c [zsxwing] Fix the code style 0b1e47b [zsxwing] Add a developer api BlockUpdatedInfo 04838a9 [zsxwing] Fix the code style 2baa161 [zsxwing] Add unit tests 80f6c6d [zsxwing] Address comments 797ee4b [zsxwing] Display Streaming blocks in Streaming UI Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb1d06fc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb1d06fc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb1d06fc Branch: refs/heads/master Commit: fb1d06fc242ec00320f1a3049673fbb03c4a6eb9 Parents: 0a4071e Author: zsxwing zsxw...@gmail.com Authored: Tue Jul 14 13:58:36 2015 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Tue Jul 14 13:58:36 2015 -0700 -- .../org/apache/spark/JavaSparkListener.java | 22 +- .../org/apache/spark/SparkFirehoseListener.java | 6 + .../spark/scheduler/EventLoggingListener.scala | 3 + .../apache/spark/scheduler/SparkListener.scala | 10 +- .../spark/scheduler/SparkListenerBus.scala | 2 + .../storage/BlockManagerMasterEndpoint.scala| 3 +- .../spark/storage/BlockStatusListener.scala | 105 + .../apache/spark/storage/BlockUpdatedInfo.scala | 47 .../scala/org/apache/spark/ui/UIUtils.scala | 14 +- .../apache/spark/ui/storage/StoragePage.scala | 148 +++- .../apache/spark/ui/storage/StorageTab.scala| 3 +- .../storage/BlockStatusListenerSuite.scala | 119 ++ .../spark/ui/storage/StoragePageSuite.scala | 230 +++ 13 files changed, 684 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fb1d06fc/core/src/main/java/org/apache/spark/JavaSparkListener.java -- diff --git a/core/src/main/java/org/apache/spark/JavaSparkListener.java b/core/src/main/java/org/apache/spark/JavaSparkListener.java index 646496f..fa9acf0 100644 --- a/core/src/main/java/org/apache/spark/JavaSparkListener.java +++ b/core/src/main/java/org/apache/spark/JavaSparkListener.java @@ -17,23 +17,7 @@ package org.apache.spark; -import org.apache.spark.scheduler.SparkListener; -import org.apache.spark.scheduler.SparkListenerApplicationEnd; -import org.apache.spark.scheduler.SparkListenerApplicationStart; -import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; -import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; -import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; -import org.apache.spark.scheduler.SparkListenerExecutorAdded; -import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; -import org.apache.spark.scheduler.SparkListenerExecutorRemoved; -import org.apache.spark.scheduler.SparkListenerJobEnd; -import org.apache.spark.scheduler.SparkListenerJobStart; -import org.apache.spark.scheduler.SparkListenerStageCompleted; -import org.apache.spark.scheduler.SparkListenerStageSubmitted; -import org.apache.spark.scheduler.SparkListenerTaskEnd; -import org.apache.spark.scheduler.SparkListenerTaskGettingResult; -import org.apache.spark.scheduler.SparkListenerTaskStart; -import org.apache.spark.scheduler.SparkListenerUnpersistRDD; +import org.apache.spark.scheduler.*; /** * Java clients should extend this class instead of implementing @@ -94,4 +78,8 @@ public class JavaSparkListener implements SparkListener { @Override public void
spark git commit: [SPARK-4362] [MLLIB] Make prediction probability available in NaiveBayesModel
Repository: spark Updated Branches: refs/heads/master 4b5cfc988 - 740b034f1 [SPARK-4362] [MLLIB] Make prediction probability available in NaiveBayesModel Add predictProbabilities to Naive Bayes, return class probabilities. Continues https://github.com/apache/spark/pull/6761 Author: Sean Owen so...@cloudera.com Closes #7376 from srowen/SPARK-4362 and squashes the following commits: 23d5a76 [Sean Owen] Fix model.labels - model.theta 95d91fb [Sean Owen] Check that predicted probabilities sum to 1 b32d1c8 [Sean Owen] Add predictProbabilities to Naive Bayes, return class probabilities Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/740b034f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/740b034f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/740b034f Branch: refs/heads/master Commit: 740b034f1ca885a386f5a9ef7e0c81c714b047ff Parents: 4b5cfc9 Author: Sean Owen so...@cloudera.com Authored: Tue Jul 14 22:44:54 2015 +0100 Committer: Sean Owen so...@cloudera.com Committed: Tue Jul 14 22:44:54 2015 +0100 -- .../spark/mllib/classification/NaiveBayes.scala | 76 +++- .../mllib/classification/NaiveBayesSuite.scala | 55 +- 2 files changed, 113 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/740b034f/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index f51ee36..9e379d7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -93,26 +93,70 @@ class NaiveBayesModel private[mllib] ( override def predict(testData: Vector): Double = { modelType match { case Multinomial = -val prob = thetaMatrix.multiply(testData) -BLAS.axpy(1.0, piVector, prob) -labels(prob.argmax) +labels(multinomialCalculation(testData).argmax) case Bernoulli = -testData.foreachActive { (index, value) = - if (value != 0.0 value != 1.0) { -throw new SparkException( - sBernoulli naive Bayes requires 0 or 1 feature values but found $testData.) - } -} -val prob = thetaMinusNegTheta.get.multiply(testData) -BLAS.axpy(1.0, piVector, prob) -BLAS.axpy(1.0, negThetaSum.get, prob) -labels(prob.argmax) - case _ = -// This should never happen. -throw new UnknownError(sInvalid modelType: $modelType.) +labels(bernoulliCalculation(testData).argmax) +} + } + + /** + * Predict values for the given data set using the model trained. + * + * @param testData RDD representing data points to be predicted + * @return an RDD[Vector] where each entry contains the predicted posterior class probabilities, + * in the same order as class labels + */ + def predictProbabilities(testData: RDD[Vector]): RDD[Vector] = { +val bcModel = testData.context.broadcast(this) +testData.mapPartitions { iter = + val model = bcModel.value + iter.map(model.predictProbabilities) } } + /** + * Predict posterior class probabilities for a single data point using the model trained. + * + * @param testData array representing a single data point + * @return predicted posterior class probabilities from the trained model, + * in the same order as class labels + */ + def predictProbabilities(testData: Vector): Vector = { +modelType match { + case Multinomial = +posteriorProbabilities(multinomialCalculation(testData)) + case Bernoulli = +posteriorProbabilities(bernoulliCalculation(testData)) +} + } + + private def multinomialCalculation(testData: Vector) = { +val prob = thetaMatrix.multiply(testData) +BLAS.axpy(1.0, piVector, prob) +prob + } + + private def bernoulliCalculation(testData: Vector) = { +testData.foreachActive((_, value) = + if (value != 0.0 value != 1.0) { +throw new SparkException( + sBernoulli naive Bayes requires 0 or 1 feature values but found $testData.) + } +) +val prob = thetaMinusNegTheta.get.multiply(testData) +BLAS.axpy(1.0, piVector, prob) +BLAS.axpy(1.0, negThetaSum.get, prob) +prob + } + + private def posteriorProbabilities(logProb: DenseVector) = { +val logProbArray = logProb.toArray +val maxLog = logProbArray.max +val scaledProbs = logProbArray.map(lp = math.exp(lp - maxLog)) +val probSum = scaledProbs.sum +new
spark git commit: [SPARK-9031] Merge BlockObjectWriter and DiskBlockObject writer to remove abstract class
Repository: spark Updated Branches: refs/heads/master 8fb3a65cb - d267c2834 [SPARK-9031] Merge BlockObjectWriter and DiskBlockObject writer to remove abstract class BlockObjectWriter has only one concrete non-test class, DiskBlockObjectWriter. In order to simplify the code in preparation for other refactorings, I think that we should remove this base class and have only DiskBlockObjectWriter. While at one time we may have planned to have multiple BlockObjectWriter implementations, that doesn't seem to have happened, so the extra abstraction seems unnecessary. Author: Josh Rosen joshro...@databricks.com Closes #7391 from JoshRosen/shuffle-write-interface-refactoring and squashes the following commits: c418e33 [Josh Rosen] Fix compilation 5047995 [Josh Rosen] Fix comments d5dc548 [Josh Rosen] Update references in comments 89dc797 [Josh Rosen] Rename test suite. 5755918 [Josh Rosen] Remove unnecessary val in case class 1607c91 [Josh Rosen] Merge BlockObjectWriter and DiskBlockObjectWriter Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d267c283 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d267c283 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d267c283 Branch: refs/heads/master Commit: d267c2834a639aaebd0559355c6a82613abb689b Parents: 8fb3a65 Author: Josh Rosen joshro...@databricks.com Authored: Tue Jul 14 12:56:17 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Tue Jul 14 12:56:17 2015 -0700 -- .../sort/BypassMergeSortShuffleWriter.java | 8 +- .../unsafe/UnsafeShuffleExternalSorter.java | 2 +- .../unsafe/sort/UnsafeSorterSpillWriter.java| 4 +- .../shuffle/FileShuffleBlockResolver.scala | 8 +- .../shuffle/IndexShuffleBlockResolver.scala | 2 +- .../spark/shuffle/hash/HashShuffleWriter.scala | 4 +- .../org/apache/spark/storage/BlockManager.scala | 2 +- .../spark/storage/BlockObjectWriter.scala | 256 --- .../spark/storage/DiskBlockObjectWriter.scala | 234 + .../spark/util/collection/ChainedBuffer.scala | 2 +- .../spark/util/collection/ExternalSorter.scala | 4 +- .../util/collection/PartitionedPairBuffer.scala | 1 - .../PartitionedSerializedPairBuffer.scala | 5 +- .../WritablePartitionedPairCollection.scala | 8 +- .../BypassMergeSortShuffleWriterSuite.scala | 4 +- .../spark/storage/BlockObjectWriterSuite.scala | 173 - .../storage/DiskBlockObjectWriterSuite.scala| 173 + .../PartitionedSerializedPairBufferSuite.scala | 52 ++-- 18 files changed, 459 insertions(+), 483 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d267c283/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java -- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index d3d6280..0b8b604 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -75,7 +75,7 @@ final class BypassMergeSortShuffleWriterK, V implements SortShuffleFileWriter private final Serializer serializer; /** Array of file writers, one for each partition */ - private BlockObjectWriter[] partitionWriters; + private DiskBlockObjectWriter[] partitionWriters; public BypassMergeSortShuffleWriter( SparkConf conf, @@ -101,7 +101,7 @@ final class BypassMergeSortShuffleWriterK, V implements SortShuffleFileWriter } final SerializerInstance serInstance = serializer.newInstance(); final long openStartTime = System.nanoTime(); -partitionWriters = new BlockObjectWriter[numPartitions]; +partitionWriters = new DiskBlockObjectWriter[numPartitions]; for (int i = 0; i numPartitions; i++) { final Tuple2TempShuffleBlockId, File tempShuffleBlockIdPlusFile = blockManager.diskBlockManager().createTempShuffleBlock(); @@ -121,7 +121,7 @@ final class BypassMergeSortShuffleWriterK, V implements SortShuffleFileWriter partitionWriters[partitioner.getPartition(key)].write(key, record._2()); } -for (BlockObjectWriter writer : partitionWriters) { +for (DiskBlockObjectWriter writer : partitionWriters) { writer.commitAndClose(); } } @@ -169,7 +169,7 @@ final class BypassMergeSortShuffleWriterK, V implements SortShuffleFileWriter if (partitionWriters != null) { try { final DiskBlockManager diskBlockManager = blockManager.diskBlockManager(); -for (BlockObjectWriter writer : partitionWriters) { +
spark git commit: [SPARK-8911] Fix local mode endless heartbeats
Repository: spark Updated Branches: refs/heads/master c4e98ff06 - 8fb3a65cb [SPARK-8911] Fix local mode endless heartbeats As of #7173 we expect executors to properly register with the driver before responding to their heartbeats. This behavior is not matched in local mode. This patch adds the missing event that needs to be posted. Author: Andrew Or and...@databricks.com Closes #7382 from andrewor14/fix-local-heartbeat and squashes the following commits: 1258bdf [Andrew Or] Post ExecutorAdded event to local executor Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8fb3a65c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8fb3a65c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8fb3a65c Branch: refs/heads/master Commit: 8fb3a65cbb714120d612e58ef9d12b0521a83260 Parents: c4e98ff Author: Andrew Or and...@databricks.com Authored: Tue Jul 14 12:47:11 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Tue Jul 14 12:47:11 2015 -0700 -- .../spark/scheduler/local/LocalBackend.scala| 20 +--- 1 file changed, 13 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8fb3a65c/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 776e5d3..4d48fcf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -25,7 +25,8 @@ import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.{Executor, ExecutorBackend} import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} -import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer} +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.ExecutorInfo private case class ReviveOffers() @@ -50,8 +51,8 @@ private[spark] class LocalEndpoint( private var freeCores = totalCores - private val localExecutorId = SparkContext.DRIVER_IDENTIFIER - private val localExecutorHostname = localhost + val localExecutorId = SparkContext.DRIVER_IDENTIFIER + val localExecutorHostname = localhost private val executor = new Executor( localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true) @@ -99,8 +100,9 @@ private[spark] class LocalBackend( extends SchedulerBackend with ExecutorBackend with Logging { private val appId = local- + System.currentTimeMillis - var localEndpoint: RpcEndpointRef = null + private var localEndpoint: RpcEndpointRef = null private val userClassPath = getUserClasspath(conf) + private val listenerBus = scheduler.sc.listenerBus /** * Returns a list of URLs representing the user classpath. @@ -113,9 +115,13 @@ private[spark] class LocalBackend( } override def start() { -localEndpoint = SparkEnv.get.rpcEnv.setupEndpoint( - LocalBackendEndpoint, - new LocalEndpoint(SparkEnv.get.rpcEnv, userClassPath, scheduler, this, totalCores)) +val rpcEnv = SparkEnv.get.rpcEnv +val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores) +localEndpoint = rpcEnv.setupEndpoint(LocalBackendEndpoint, executorEndpoint) +listenerBus.post(SparkListenerExecutorAdded( + System.currentTimeMillis, + executorEndpoint.localExecutorId, + new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty))) } override def stop() { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8800] [SQL] Fix inaccurate precision/scale of Decimal division operation
Repository: spark Updated Branches: refs/heads/master fb1d06fc2 - 4b5cfc988 [SPARK-8800] [SQL] Fix inaccurate precision/scale of Decimal division operation JIRA: https://issues.apache.org/jira/browse/SPARK-8800 Previously, we turn to Java BigDecimal's divide with specified ROUNDING_MODE to avoid non-terminating decimal expansion problem. However, as JihongMA reported, for the division operation on some specific values, we get inaccurate results. Author: Liang-Chi Hsieh vii...@gmail.com Closes #7212 from viirya/fix_decimal4 and squashes the following commits: 4205a0a [Liang-Chi Hsieh] Fix inaccuracy precision/scale of Decimal division operation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b5cfc98 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b5cfc98 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b5cfc98 Branch: refs/heads/master Commit: 4b5cfc988f23988c2334882a255d494fc93d252e Parents: fb1d06f Author: Liang-Chi Hsieh vii...@gmail.com Authored: Tue Jul 14 14:19:27 2015 -0700 Committer: Yin Huai yh...@databricks.com Committed: Tue Jul 14 14:19:27 2015 -0700 -- .../scala/org/apache/spark/sql/types/Decimal.scala| 14 +++--- .../apache/spark/sql/types/decimal/DecimalSuite.scala | 10 +- 2 files changed, 20 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4b5cfc98/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala index 5a16948..f5bd068 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala @@ -145,6 +145,14 @@ final class Decimal extends Ordered[Decimal] with Serializable { } } + def toLimitedBigDecimal: BigDecimal = { +if (decimalVal.ne(null)) { + decimalVal +} else { + BigDecimal(longVal, _scale) +} + } + def toJavaBigDecimal: java.math.BigDecimal = toBigDecimal.underlying() def toUnscaledLong: Long = { @@ -269,9 +277,9 @@ final class Decimal extends Ordered[Decimal] with Serializable { if (that.isZero) { null } else { - // To avoid non-terminating decimal expansion problem, we turn to Java BigDecimal's divide - // with specified ROUNDING_MODE. - Decimal(toJavaBigDecimal.divide(that.toJavaBigDecimal, ROUNDING_MODE.id)) + // To avoid non-terminating decimal expansion problem, we get scala's BigDecimal with limited + // precision and scala. + Decimal(toLimitedBigDecimal / that.toLimitedBigDecimal) } } http://git-wip-us.apache.org/repos/asf/spark/blob/4b5cfc98/sql/catalyst/src/test/scala/org/apache/spark/sql/types/decimal/DecimalSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/decimal/DecimalSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/decimal/DecimalSuite.scala index 5f31296..030bb6d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/decimal/DecimalSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/decimal/DecimalSuite.scala @@ -170,6 +170,14 @@ class DecimalSuite extends SparkFunSuite with PrivateMethodTester { test(fix non-terminating decimal expansion problem) { val decimal = Decimal(1.0, 10, 3) / Decimal(3.0, 10, 3) -assert(decimal.toString === 0.333) +// The difference between decimal should not be more than 0.001. +assert(decimal.toDouble - 0.333 0.001) + } + + test(fix loss of precision/scale when doing division operation) { +val a = Decimal(2) / Decimal(3) +assert(a.toDouble 1.0 a.toDouble 0.6) +val b = Decimal(1) / Decimal(8) +assert(b.toDouble === 0.125) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.4.1 [created] dbaa5c294 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5523] [CORE] [STREAMING] Add a cache for hostname in TaskMetrics to decrease the memory usage and GC overhead
Repository: spark Updated Branches: refs/heads/master f957796c4 - bb870e72f [SPARK-5523] [CORE] [STREAMING] Add a cache for hostname in TaskMetrics to decrease the memory usage and GC overhead Hostname in TaskMetrics will be created through deserialization, mostly the number of hostname is only the order of number of cluster node, so adding a cache layer to dedup the object could reduce the memory usage and alleviate GC overhead, especially for long-running and fast job generation applications like Spark Streaming. Author: jerryshao saisai.s...@intel.com Author: Saisai Shao saisai.s...@intel.com Closes #5064 from jerryshao/SPARK-5523 and squashes the following commits: 3e2412a [jerryshao] Address the comments b092a81 [Saisai Shao] Add a pool to cache the hostname Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb870e72 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb870e72 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb870e72 Branch: refs/heads/master Commit: bb870e72f42b6ce8d056df259f6fcf41808d7ed2 Parents: f957796 Author: jerryshao saisai.s...@intel.com Authored: Tue Jul 14 19:54:02 2015 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Tue Jul 14 19:54:02 2015 -0700 -- .../org/apache/spark/executor/TaskMetrics.scala | 20 1 file changed, 20 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bb870e72/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index a3b4561..e80feee 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -17,11 +17,15 @@ package org.apache.spark.executor +import java.io.{IOException, ObjectInputStream} +import java.util.concurrent.ConcurrentHashMap + import scala.collection.mutable.ArrayBuffer import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.DataReadMethod.DataReadMethod import org.apache.spark.storage.{BlockId, BlockStatus} +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -210,10 +214,26 @@ class TaskMetrics extends Serializable { private[spark] def updateInputMetrics(): Unit = synchronized { inputMetrics.foreach(_.updateBytesRead()) } + + @throws(classOf[IOException]) + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { +in.defaultReadObject() +// Get the hostname from cached data, since hostname is the order of number of nodes in +// cluster, so using cached hostname will decrease the object number and alleviate the GC +// overhead. +_hostname = TaskMetrics.getCachedHostName(_hostname) + } } private[spark] object TaskMetrics { + private val hostNameCache = new ConcurrentHashMap[String, String]() + def empty: TaskMetrics = new TaskMetrics + + def getCachedHostName(host: String): String = { +val canonicalHost = hostNameCache.putIfAbsent(host, host) +if (canonicalHost != null) canonicalHost else host + } } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r9826 - /dev/spark/spark-1.4.1-rc4-bin/ /release/spark/spark-1.4.1/
Author: pwendell Date: Wed Jul 15 03:29:55 2015 New Revision: 9826 Log: Spark release 1.4.1 Added: release/spark/spark-1.4.1/ - copied from r9825, dev/spark/spark-1.4.1-rc4-bin/ Removed: dev/spark/spark-1.4.1-rc4-bin/ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r9825 - in /dev/spark/spark-1.4.1-rc4-bin: spark-1.4.1-bin-hadoop2.4-without-hive.tgz spark-1.4.1-bin-hadoop2.4-without-hive.tgz.asc spark-1.4.1-bin-hadoop2.4-without-hive.tgz.md5 spark-1.
Author: pwendell Date: Wed Jul 15 03:28:40 2015 New Revision: 9825 Log: Removing hive developer build Removed: dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.4-without-hive.tgz dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.4-without-hive.tgz.asc dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.4-without-hive.tgz.md5 dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.4-without-hive.tgz.sha - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9045] Fix Scala 2.11 build break in UnsafeExternalRowSorter
Repository: spark Updated Branches: refs/heads/master 11e5c3728 - e965a798d [SPARK-9045] Fix Scala 2.11 build break in UnsafeExternalRowSorter This fixes a compilation break in under Scala 2.11: ``` [error] /home/jenkins/workspace/Spark-Master-Scala211-Compile/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java:135: error: anonymous org.apache.spark.sql.execution.UnsafeExternalRowSorter$1 is not abstract and does not override abstract method BminBy(Function1InternalRow,B,OrderingB) in TraversableOnce [error] return new AbstractScalaRowIterator() { [error] ^ [error] where B,A are type-variables: [error] B extends Object declared in method BminBy(Function1A,B,OrderingB) [error] A extends Object declared in interface TraversableOnce [error] 1 error ``` The workaround for this is to make `AbstractScalaRowIterator` into a concrete class. Author: Josh Rosen joshro...@databricks.com Closes #7405 from JoshRosen/SPARK-9045 and squashes the following commits: cbcbb4c [Josh Rosen] Forgot that we can't use the ??? operator anymore 577ba60 [Josh Rosen] [SPARK-9045] Fix Scala 2.11 build break in UnsafeExternalRowSorter. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e965a798 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e965a798 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e965a798 Branch: refs/heads/master Commit: e965a798d09a9fba61b104c5cc0b65cdc28d27f6 Parents: 11e5c37 Author: Josh Rosen joshro...@databricks.com Authored: Tue Jul 14 17:21:48 2015 -0700 Committer: Josh Rosen joshro...@databricks.com Committed: Tue Jul 14 17:21:48 2015 -0700 -- .../org/apache/spark/sql/AbstractScalaRowIterator.scala | 11 +++ 1 file changed, 7 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e965a798/sql/catalyst/src/main/scala/org/apache/spark/sql/AbstractScalaRowIterator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AbstractScalaRowIterator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AbstractScalaRowIterator.scala index cfefb13..1090bdb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AbstractScalaRowIterator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AbstractScalaRowIterator.scala @@ -17,11 +17,14 @@ package org.apache.spark.sql -import org.apache.spark.sql.catalyst.InternalRow - /** * Shim to allow us to implement [[scala.Iterator]] in Java. Scala 2.11+ has an AbstractIterator * class for this, but that class is `private[scala]` in 2.10. We need to explicitly fix this to - * `Row` in order to work around a spurious IntelliJ compiler error. + * `Row` in order to work around a spurious IntelliJ compiler error. This cannot be an abstract + * class because that leads to compilation errors under Scala 2.11. */ -private[spark] abstract class AbstractScalaRowIterator extends Iterator[InternalRow] +private[spark] class AbstractScalaRowIterator[T] extends Iterator[T] { + override def hasNext: Boolean = throw new NotImplementedError + + override def next(): T = throw new NotImplementedError +} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r9824 - /dev/spark/spark-1.4.1-rc4-bin/
Author: pwendell Date: Wed Jul 15 03:25:59 2015 New Revision: 9824 Log: Adding Spark 1.4.1 RC4 Added: dev/spark/spark-1.4.1-rc4-bin/ dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-cdh4.tgz (with props) dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-cdh4.tgz.asc (with props) dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-cdh4.tgz.md5 dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-cdh4.tgz.sha dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop1-scala2.11.tgz (with props) dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop1-scala2.11.tgz.asc (with props) dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop1-scala2.11.tgz.md5 dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop1-scala2.11.tgz.sha dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop1.tgz (with props) dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop1.tgz.asc (with props) dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop1.tgz.md5 dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop1.tgz.sha dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.3.tgz (with props) dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.3.tgz.asc (with props) dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.3.tgz.md5 dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.3.tgz.sha dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.4-without-hive.tgz (with props) dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.4-without-hive.tgz.asc (with props) dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.4-without-hive.tgz.md5 dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.4-without-hive.tgz.sha dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.4.tgz (with props) dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.4.tgz.asc (with props) dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.4.tgz.md5 dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.4.tgz.sha dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.6.tgz (with props) dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.6.tgz.asc (with props) dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.6.tgz.md5 dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop2.6.tgz.sha dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-without-hadoop.tgz (with props) dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-without-hadoop.tgz.asc (with props) dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-without-hadoop.tgz.md5 dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-without-hadoop.tgz.sha dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1.tgz (with props) dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1.tgz.asc (with props) dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1.tgz.md5 dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1.tgz.sha Added: dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-cdh4.tgz == Binary file - no diff available. Propchange: dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-cdh4.tgz -- svn:mime-type = application/x-gzip Added: dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-cdh4.tgz.asc == Binary file - no diff available. Propchange: dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-cdh4.tgz.asc -- svn:mime-type = application/pgp-signature Added: dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-cdh4.tgz.md5 == --- dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-cdh4.tgz.md5 (added) +++ dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-cdh4.tgz.md5 Wed Jul 15 03:25:59 2015 @@ -0,0 +1 @@ +spark-1.4.1-bin-cdh4.tgz: 49 B9 4C 92 1B 82 36 3D 2D 7F 88 20 9D 0A 70 A7 Added: dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-cdh4.tgz.sha == --- dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-cdh4.tgz.sha (added) +++ dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-cdh4.tgz.sha Wed Jul 15 03:25:59 2015 @@ -0,0 +1,3 @@ +spark-1.4.1-bin-cdh4.tgz: EDD359E8 2B0516AB 611ADB14 BC0A1E4B 292F43BB 0407B7A0 + 96C166BD DBAB87DE 4BE08544 09F6F862 953E326F E782749D + 50EC29C1 B65076A6 FD62C9E5 89156D26 Added: dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop1-scala2.11.tgz == Binary file - no diff available. Propchange: dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop1-scala2.11.tgz -- svn:mime-type = application/x-gzip Added: dev/spark/spark-1.4.1-rc4-bin/spark-1.4.1-bin-hadoop1-scala2.11.tgz.asc
spark git commit: [SPARK-8962] Add Scalastyle rule to ban direct use of Class.forName; fix existing uses
Repository: spark Updated Branches: refs/heads/master 740b034f1 - 11e5c3728 [SPARK-8962] Add Scalastyle rule to ban direct use of Class.forName; fix existing uses This pull request adds a Scalastyle regex rule which fails the style check if `Class.forName` is used directly. `Class.forName` always loads classes from the default / system classloader, but in a majority of cases, we should be using Spark's own `Utils.classForName` instead, which tries to load classes from the current thread's context classloader and falls back to the classloader which loaded Spark when the context classloader is not defined. !-- Reviewable:start -- [img src=https://reviewable.io/review_button.png; height=40 alt=Review on Reviewable/](https://reviewable.io/reviews/apache/spark/7350) !-- Reviewable:end -- Author: Josh Rosen joshro...@databricks.com Closes #7350 from JoshRosen/ban-Class.forName and squashes the following commits: e3e96f7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into ban-Class.forName c0b7885 [Josh Rosen] Hopefully fix the last two cases d707ba7 [Josh Rosen] Fix uses of Class.forName that I missed in my first cleanup pass 046470d [Josh Rosen] Merge remote-tracking branch 'origin/master' into ban-Class.forName 62882ee [Josh Rosen] Fix uses of Class.forName or add exclusion. d9abade [Josh Rosen] Add stylechecker rule to ban uses of Class.forName Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/11e5c372 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/11e5c372 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/11e5c372 Branch: refs/heads/master Commit: 11e5c372862ec00e57460b37ccfee51c6d93c5f7 Parents: 740b034 Author: Josh Rosen joshro...@databricks.com Authored: Tue Jul 14 16:08:17 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Tue Jul 14 16:08:17 2015 -0700 -- .../src/main/scala/org/apache/spark/Logging.scala | 2 +- .../scala/org/apache/spark/SparkContext.scala | 11 +-- .../main/scala/org/apache/spark/SparkEnv.scala| 2 +- .../org/apache/spark/api/r/RBackendHandler.scala | 18 ++ .../apache/spark/broadcast/BroadcastManager.scala | 3 ++- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 4 ++-- .../org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../spark/deploy/SparkSubmitArguments.scala | 2 +- .../spark/deploy/history/HistoryServer.scala | 2 +- .../org/apache/spark/deploy/master/Master.scala | 2 +- .../deploy/rest/SubmitRestProtocolMessage.scala | 2 +- .../spark/deploy/worker/DriverWrapper.scala | 2 +- .../spark/deploy/worker/WorkerArguments.scala | 2 ++ .../org/apache/spark/executor/Executor.scala | 2 +- .../org/apache/spark/io/CompressionCodec.scala| 3 +-- .../spark/mapred/SparkHadoopMapRedUtil.scala | 5 +++-- .../mapreduce/SparkHadoopMapReduceUtil.scala | 9 + .../org/apache/spark/metrics/MetricsSystem.scala | 6 -- .../scala/org/apache/spark/rdd/HadoopRDD.scala| 6 +++--- .../main/scala/org/apache/spark/rpc/RpcEnv.scala | 3 +-- .../apache/spark/serializer/JavaSerializer.scala | 5 - .../apache/spark/serializer/KryoSerializer.scala | 2 ++ .../spark/serializer/SerializationDebugger.scala | 2 ++ .../apache/spark/storage/ExternalBlockStore.scala | 2 +- .../org/apache/spark/util/ClosureCleaner.scala| 2 ++ .../org/apache/spark/util/SizeEstimator.scala | 2 ++ .../main/scala/org/apache/spark/util/Utils.scala | 11 +-- .../test/scala/org/apache/spark/FileSuite.scala | 2 ++ .../SparkContextSchedulerCreationSuite.scala | 3 ++- .../apache/spark/deploy/SparkSubmitSuite.scala| 4 ++-- .../scala/org/apache/spark/rdd/JdbcRDDSuite.scala | 3 ++- .../KryoSerializerDistributedSuite.scala | 2 ++ .../spark/util/MutableURLClassLoaderSuite.scala | 2 ++ .../spark/streaming/flume/sink/Logging.scala | 2 ++ .../apache/spark/graphx/util/BytecodeUtils.scala | 2 +- .../scala/org/apache/spark/repl/SparkIMain.scala | 2 ++ scalastyle-config.xml | 11 +++ .../org/apache/spark/sql/types/DataType.scala | 3 ++- .../scala/org/apache/spark/sql/SQLContext.scala | 3 +-- .../spark/sql/parquet/ParquetRelation.scala | 7 --- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 3 ++- .../apache/spark/sql/jdbc/JDBCWriteSuite.scala| 3 ++- .../thriftserver/HiveThriftServer2Suites.scala| 2 +- .../org/apache/spark/sql/hive/TableReader.scala | 4 +--- .../spark/sql/hive/client/ClientWrapper.scala | 9 - .../spark/sql/hive/HiveSparkSubmitSuite.scala | 8 .../spark/streaming/scheduler/JobGenerator.scala | 6 +++--- .../apache/spark/tools/GenerateMIMAIgnore.scala | 2 ++ .../org/apache/spark/deploy/yarn/Client.scala | 4 ++-- 49 files
spark git commit: [HOTFIX] Adding new names to known contributors
Repository: spark Updated Branches: refs/heads/master bb870e72f - 5572fd0c5 [HOTFIX] Adding new names to known contributors Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5572fd0c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5572fd0c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5572fd0c Branch: refs/heads/master Commit: 5572fd0c518acd2e4483ff41bea1eb1cffd543ce Parents: bb870e7 Author: Patrick Wendell patr...@databricks.com Authored: Tue Jul 14 21:44:47 2015 -0700 Committer: Patrick Wendell patr...@databricks.com Committed: Tue Jul 14 21:44:47 2015 -0700 -- dev/create-release/known_translations | 9 + 1 file changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5572fd0c/dev/create-release/known_translations -- diff --git a/dev/create-release/known_translations b/dev/create-release/known_translations index 5f2671a..e462302 100644 --- a/dev/create-release/known_translations +++ b/dev/create-release/known_translations @@ -129,3 +129,12 @@ yongtang - Yong Tang ypcat - Pei-Lun Lee zhichao-li - Zhichao Li zzcclp - Zhichao Zhang +979969786 - Yuming Wang +Rosstin - Rosstin Murphy +ameyc - Amey Chaugule +animeshbaranawal - Animesh Baranawal +cafreeman - Chris Freeman +lee19 - Lee +lockwobr - Brian Lockwood +navis - Navis Ryu +pparkkin - Paavo Parkkinen - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8808] [SPARKR] Fix assignments in SparkR.
Repository: spark Updated Branches: refs/heads/master 5572fd0c5 - f650a005e [SPARK-8808] [SPARKR] Fix assignments in SparkR. Author: Sun Rui rui@intel.com Closes #7395 from sun-rui/SPARK-8808 and squashes the following commits: ce603bc [Sun Rui] Use '-' instead of '='. 88590b1 [Sun Rui] Use '-' instead of '='. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f650a005 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f650a005 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f650a005 Branch: refs/heads/master Commit: f650a005e03ecd800c9005a496cc6a0d8eb68c93 Parents: 5572fd0 Author: Sun Rui rui@intel.com Authored: Tue Jul 14 22:21:01 2015 -0700 Committer: Shivaram Venkataraman shiva...@cs.berkeley.edu Committed: Tue Jul 14 22:21:01 2015 -0700 -- R/pkg/R/DataFrame.R | 2 +- R/pkg/R/client.R| 4 ++-- R/pkg/R/group.R | 4 ++-- R/pkg/R/utils.R | 4 ++-- R/pkg/inst/tests/test_binaryFile.R | 2 +- R/pkg/inst/tests/test_binary_function.R | 2 +- R/pkg/inst/tests/test_rdd.R | 4 ++-- R/pkg/inst/tests/test_textFile.R| 2 +- R/pkg/inst/tests/test_utils.R | 2 +- 9 files changed, 13 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f650a005/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 6070282..2088137 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1328,7 +1328,7 @@ setMethod(write.df, jmode - callJStatic(org.apache.spark.sql.api.r.SQLUtils, saveMode, mode) options - varargsToEnv(...) if (!is.null(path)) { -options[['path']] = path +options[['path']] - path } callJMethod(df@sdf, save, source, jmode, options) }) http://git-wip-us.apache.org/repos/asf/spark/blob/f650a005/R/pkg/R/client.R -- diff --git a/R/pkg/R/client.R b/R/pkg/R/client.R index 78c7a30..6f77215 100644 --- a/R/pkg/R/client.R +++ b/R/pkg/R/client.R @@ -36,9 +36,9 @@ connectBackend - function(hostname, port, timeout = 6000) { determineSparkSubmitBin - function() { if (.Platform$OS.type == unix) { -sparkSubmitBinName = spark-submit +sparkSubmitBinName - spark-submit } else { -sparkSubmitBinName = spark-submit.cmd +sparkSubmitBinName - spark-submit.cmd } sparkSubmitBinName } http://git-wip-us.apache.org/repos/asf/spark/blob/f650a005/R/pkg/R/group.R -- diff --git a/R/pkg/R/group.R b/R/pkg/R/group.R index 8f1c68f..576ac72 100644 --- a/R/pkg/R/group.R +++ b/R/pkg/R/group.R @@ -87,7 +87,7 @@ setMethod(count, setMethod(agg, signature(x = GroupedData), function(x, ...) { -cols = list(...) +cols - list(...) stopifnot(length(cols) 0) if (is.character(cols[[1]])) { cols - varargsToEnv(...) @@ -97,7 +97,7 @@ setMethod(agg, if (!is.null(ns)) { for (n in ns) { if (n != ) { -cols[[n]] = alias(cols[[n]], n) +cols[[n]] - alias(cols[[n]], n) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/f650a005/R/pkg/R/utils.R -- diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index ea629a6..950ba74 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -41,8 +41,8 @@ convertJListToRList - function(jList, flatten, logicalUpperBound = NULL, if (isInstanceOf(obj, scala.Tuple2)) { # JavaPairRDD[Array[Byte], Array[Byte]]. -keyBytes = callJMethod(obj, _1) -valBytes = callJMethod(obj, _2) +keyBytes - callJMethod(obj, _1) +valBytes - callJMethod(obj, _2) res - list(unserialize(keyBytes), unserialize(valBytes)) } else { http://git-wip-us.apache.org/repos/asf/spark/blob/f650a005/R/pkg/inst/tests/test_binaryFile.R -- diff --git a/R/pkg/inst/tests/test_binaryFile.R b/R/pkg/inst/tests/test_binaryFile.R index ccaea18..f2452ed 100644 --- a/R/pkg/inst/tests/test_binaryFile.R +++ b/R/pkg/inst/tests/test_binaryFile.R @@ -20,7 +20,7 @@ context(functions on binary files) # JavaSparkContext handle sc - sparkR.init() -mockFile = c(Spark is pretty., Spark is awesome.) +mockFile - c(Spark is pretty.,
spark git commit: Revert SPARK-6910 and SPARK-9027
Repository: spark Updated Branches: refs/heads/master f23a721c1 - c6b1a9e74 Revert SPARK-6910 and SPARK-9027 Revert #7216 and #7386. These patch seems to be causing quite a few test failures: ``` Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor322.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:351) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$getPartitionsByFilter$1.apply(ClientWrapper.scala:320) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$getPartitionsByFilter$1.apply(ClientWrapper.scala:318) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:180) at org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:135) at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:172) at org.apache.spark.sql.hive.client.ClientWrapper.getPartitionsByFilter(ClientWrapper.scala:318) at org.apache.spark.sql.hive.client.HiveTable.getPartitions(ClientInterface.scala:78) at org.apache.spark.sql.hive.MetastoreRelation.getHiveQlPartitions(HiveMetastoreCatalog.scala:670) at org.apache.spark.sql.hive.execution.HiveTableScan.doExecute(HiveTableScan.scala:137) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:90) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:90) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:89) at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:164) at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:151) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) ... 85 more Caused by: MetaException(message:Filtering is supported only on partition keys of type string) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$FilterBuilder.setError(ExpressionTree.java:185) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$LeafNode.getJdoFilterPushdownParam(ExpressionTree.java:452) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$LeafNode.generateJDOFilterOverPartitions(ExpressionTree.java:357) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$LeafNode.generateJDOFilter(ExpressionTree.java:279) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$TreeNode.generateJDOFilter(ExpressionTree.java:243) at org.apache.hadoop.hive.metastore.parser.ExpressionTree.generateJDOFilterFragment(ExpressionTree.java:590) at org.apache.hadoop.hive.metastore.ObjectStore.makeQueryFilterString(ObjectStore.java:2417) at org.apache.hadoop.hive.metastore.ObjectStore.getPartitionsViaOrmFilter(ObjectStore.java:2029) at org.apache.hadoop.hive.metastore.ObjectStore.access$500(ObjectStore.java:146) at org.apache.hadoop.hive.metastore.ObjectStore$4.getJdoResult(ObjectStore.java:2332) ``` https://amplab.cs.berkeley.edu/jenkins/view/Spark-QA-Test/job/Spark-Master-Maven-with-YARN/2945/HADOOP_PROFILE=hadoop-2.4,label=centos/testReport/junit/org.apache.spark.sql.hive.execution/SortMergeCompatibilitySuite/auto_sortmerge_join_16/ Author: Michael Armbrust mich...@databricks.com Closes #7409 from marmbrus/revertMetastorePushdown and squashes the following commits: 92fabd3 [Michael Armbrust] Revert SPARK-6910 and SPARK-9027 5d3bdf2 [Michael Armbrust] Revert [SPARK-9027] [SQL] Generalize metastore predicate pushdown Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6b1a9e7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6b1a9e7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6b1a9e7 Branch: refs/heads/master Commit: c6b1a9e74e34267dc198e57a184c41498ca9d6a3 Parents: f23a721 Author: Michael Armbrust mich...@databricks.com Authored: Tue Jul 14 22:57:39 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Tue Jul 14 22:57:39 2015 -0700 -- .../spark/sql/hive/HiveMetastoreCatalog.scala | 58 +++ .../org/apache/spark/sql/hive/HiveShim.scala| 1 - .../apache/spark/sql/hive/HiveStrategies.scala | 4 +- .../spark/sql/hive/client/ClientInterface.scala | 11 +-- .../spark/sql/hive/client/ClientWrapper.scala | 21 +++--- .../apache/spark/sql/hive/client/HiveShim.scala | 72 +- .../sql/hive/execution/HiveTableScan.scala
svn commit: r1691124 - in /spark/site/docs/1.4.1: ./ api/ api/R/ api/java/ api/java/org/ api/java/org/apache/ api/java/org/apache/spark/ api/java/org/apache/spark/annotation/ api/java/org/apache/spark
Author: pwendell Date: Wed Jul 15 04:17:01 2015 New Revision: 1691124 URL: http://svn.apache.org/r1691124 Log: Spark 1.4.1 docs [This commit notification would consist of 734 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8993][SQL] More comprehensive type checking in expressions.
Repository: spark Updated Branches: refs/heads/master f650a005e - f23a721c1 [SPARK-8993][SQL] More comprehensive type checking in expressions. This patch makes the following changes: 1. ExpectsInputTypes only defines expected input types, but does not perform any implicit type casting. 2. ImplicitCastInputTypes is a new trait that defines both expected input types, as well as performs implicit type casting. 3. BinaryOperator has a new abstract function inputType, which defines the expected input type for both left/right. Concrete BinaryOperator expressions no longer perform any implicit type casting. 4. For BinaryOperators, convert NullType (i.e. null literals) into some accepted type so BinaryOperators don't need to handle NullTypes. TODOs needed: fix unit tests for error reporting. I'm intentionally not changing anything in aggregate expressions because yhuai is doing a big refactoring on that right now. Author: Reynold Xin r...@databricks.com Closes #7348 from rxin/typecheck and squashes the following commits: 8fcf814 [Reynold Xin] Fixed ordering of cases. 3bb63e7 [Reynold Xin] Style fix. f45408f [Reynold Xin] Comment update. aa7790e [Reynold Xin] Moved RemoveNullTypes into ImplicitTypeCasts. 438ea07 [Reynold Xin] space d55c9e5 [Reynold Xin] Removes NullTypes. 360d124 [Reynold Xin] Fixed the rule. fb66657 [Reynold Xin] Convert NullType into some accepted type for BinaryOperators. 2e22330 [Reynold Xin] Fixed unit tests. 4932d57 [Reynold Xin] Style fix. d061691 [Reynold Xin] Rename existing ExpectsInputTypes - ImplicitCastInputTypes. e4727cc [Reynold Xin] BinaryOperator should not be doing implicit cast. d017861 [Reynold Xin] Improve expression type checking. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f23a721c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f23a721c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f23a721c Branch: refs/heads/master Commit: f23a721c10b64ec5c6768634fc5e9e7b60ee7ca8 Parents: f650a00 Author: Reynold Xin r...@databricks.com Authored: Tue Jul 14 22:52:53 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Tue Jul 14 22:52:53 2015 -0700 -- .../catalyst/analysis/FunctionRegistry.scala| 1 + .../catalyst/analysis/HiveTypeCoercion.scala| 43 ++ .../expressions/ExpectsInputTypes.scala | 17 +++- .../sql/catalyst/expressions/Expression.scala | 44 +- .../sql/catalyst/expressions/ScalaUDF.scala | 2 +- .../sql/catalyst/expressions/arithmetic.scala | 84 +--- .../sql/catalyst/expressions/bitwise.scala | 30 +++ .../spark/sql/catalyst/expressions/math.scala | 18 ++--- .../spark/sql/catalyst/expressions/misc.scala | 8 +- .../sql/catalyst/expressions/predicates.scala | 83 ++- .../catalyst/expressions/stringOperations.scala | 36 - .../spark/sql/catalyst/util/TypeUtils.scala | 8 -- .../spark/sql/types/AbstractDataType.scala | 35 .../catalyst/analysis/AnalysisErrorSuite.scala | 2 +- .../analysis/ExpressionTypeCheckingSuite.scala | 6 +- .../analysis/HiveTypeCoercionSuite.scala| 56 + .../apache/spark/sql/MathExpressionsSuite.scala | 1 - 17 files changed, 309 insertions(+), 165 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f23a721c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index ed69c42..6b1a94e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.analysis +import scala.language.existentials import scala.reflect.ClassTag import scala.util.{Failure, Success, Try} http://git-wip-us.apache.org/repos/asf/spark/blob/f23a721c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index 8cb7199..15da5ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -214,19 +214,6 @@ object HiveTypeCoercion {
spark git commit: [SPARK-9050] [SQL] Remove unused newOrdering argument from Exchange (cleanup after SPARK-8317)
Repository: spark Updated Branches: refs/heads/master e965a798d - cc57d705e [SPARK-9050] [SQL] Remove unused newOrdering argument from Exchange (cleanup after SPARK-8317) SPARK-8317 changed the SQL Exchange operator so that it no longer pushed sorting into Spark's shuffle layer, a change which allowed more efficient SQL-specific sorters to be used. This patch performs some leftover cleanup based on those changes: - Exchange's constructor should no longer accept a `newOrdering` since it's no longer used and no longer works as expected. - `addOperatorsIfNecessary` looked at shuffle input's output ordering to decide whether to sort, but this is the wrong node to be examining: it needs to look at whether the post-shuffle node has the right ordering, since shuffling will not preserve row orderings. Thanks to davies for spotting this. Author: Josh Rosen joshro...@databricks.com Closes #7407 from JoshRosen/SPARK-9050 and squashes the following commits: e70be50 [Josh Rosen] No need to wrap line e866494 [Josh Rosen] Refactor addOperatorsIfNecessary to make code clearer 2e467da [Josh Rosen] Remove `newOrdering` from Exchange. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc57d705 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc57d705 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc57d705 Branch: refs/heads/master Commit: cc57d705e732aefc2f3d3f438e84d71705b2eb65 Parents: e965a79 Author: Josh Rosen joshro...@databricks.com Authored: Tue Jul 14 18:55:34 2015 -0700 Committer: Josh Rosen joshro...@databricks.com Committed: Tue Jul 14 18:55:34 2015 -0700 -- .../apache/spark/sql/execution/Exchange.scala | 37 .../spark/sql/execution/SparkStrategies.scala | 3 +- 2 files changed, 16 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cc57d705/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 4b783e3..feea4f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -35,21 +35,13 @@ import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEn /** * :: DeveloperApi :: - * Performs a shuffle that will result in the desired `newPartitioning`. Optionally sorts each - * resulting partition based on expressions from the partition key. It is invalid to construct an - * exchange operator with a `newOrdering` that cannot be calculated using the partitioning key. + * Performs a shuffle that will result in the desired `newPartitioning`. */ @DeveloperApi -case class Exchange( -newPartitioning: Partitioning, -newOrdering: Seq[SortOrder], -child: SparkPlan) - extends UnaryNode { +case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode { override def outputPartitioning: Partitioning = newPartitioning - override def outputOrdering: Seq[SortOrder] = newOrdering - override def output: Seq[Attribute] = child.output /** @@ -279,23 +271,24 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ partitioning: Partitioning, rowOrdering: Seq[SortOrder], child: SparkPlan): SparkPlan = { -val needSort = rowOrdering.nonEmpty child.outputOrdering != rowOrdering -val needsShuffle = child.outputPartitioning != partitioning -val withShuffle = if (needsShuffle) { - Exchange(partitioning, Nil, child) -} else { - child +def addShuffleIfNecessary(child: SparkPlan): SparkPlan = { + if (child.outputPartitioning != partitioning) { +Exchange(partitioning, child) + } else { +child + } } -val withSort = if (needSort) { - sqlContext.planner.BasicOperators.getSortOperator( -rowOrdering, global = false, withShuffle) -} else { - withShuffle +def addSortIfNecessary(child: SparkPlan): SparkPlan = { + if (rowOrdering.nonEmpty child.outputOrdering != rowOrdering) { +sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, global = false, child) + } else { +child + } } -withSort +addSortIfNecessary(addShuffleIfNecessary(child)) } if (meetsRequirements compatible !needsAnySort) {
spark git commit: [SPARK-8820] [STREAMING] Add a configuration to set checkpoint dir.
Repository: spark Updated Branches: refs/heads/master cc57d705e - f957796c4 [SPARK-8820] [STREAMING] Add a configuration to set checkpoint dir. Add a configuration to set checkpoint directory for convenience to user. [Jira Address](https://issues.apache.org/jira/browse/SPARK-8820) Author: huangzhaowei carlmartin...@gmail.com Closes #7218 from SaintBacchus/SPARK-8820 and squashes the following commits: d49fe4b [huangzhaowei] Rename the configuration name 66ea47c [huangzhaowei] Add the unit test. dd0acc1 [huangzhaowei] [SPARK-8820][Streaming] Add a configuration to set checkpoint dir. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f957796c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f957796c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f957796c Branch: refs/heads/master Commit: f957796c4b3c3cd95edfc64500a045f7e810ee87 Parents: cc57d70 Author: huangzhaowei carlmartin...@gmail.com Authored: Tue Jul 14 19:20:49 2015 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Tue Jul 14 19:20:49 2015 -0700 -- .../scala/org/apache/spark/streaming/StreamingContext.scala | 2 ++ .../org/apache/spark/streaming/StreamingContextSuite.scala | 9 + 2 files changed, 11 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f957796c/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 6b78a82..92438f1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -201,6 +201,8 @@ class StreamingContext private[streaming] ( private var shutdownHookRef: AnyRef = _ + conf.getOption(spark.streaming.checkpoint.directory).foreach(checkpoint) + /** * Return the associated Spark context */ http://git-wip-us.apache.org/repos/asf/spark/blob/f957796c/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 289a159..f588cf5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -115,6 +115,15 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo assert(ssc.conf.getTimeAsSeconds(spark.cleaner.ttl, -1) === 10) } + test(checkPoint from conf) { +val checkpointDirectory = Utils.createTempDir().getAbsolutePath() + +val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) +myConf.set(spark.streaming.checkpoint.directory, checkpointDirectory) +val ssc = new StreamingContext(myConf, batchDuration) +assert(ssc.checkpointDir != null) + } + test(state matching) { import StreamingContextState._ assert(INITIALIZED === INITIALIZED) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org