[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7412#issuecomment-121852061 [Test build #37466 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37466/consoleFull) for PR 7412 at commit [`b07e20c`](https://github.com/apache/spark/commit/b07e20c973775ee545249657416a821e90829392). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9016][ML][WIP] make random forest class...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7432#issuecomment-121852055 [Test build #37465 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37465/consoleFull) for PR 7432 at commit [`222a10b`](https://github.com/apache/spark/commit/222a10b68bfc9fe94c413b9347b76693d691de72). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9016][ML][WIP] make random forest class...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7432#issuecomment-121851930 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7412#issuecomment-121851944 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7412#issuecomment-121851933 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9016][ML][WIP] make random forest class...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7432#issuecomment-121851939 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5989] [MLlib] Model save/load for LDA
Github user MechCoder commented on a diff in the pull request: https://github.com/apache/spark/pull/6948#discussion_r34759954 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala --- @@ -184,6 +199,82 @@ class LocalLDAModel private[clustering] ( } +@Experimental +object LocalLDAModel extends Loader[LocalLDAModel] { + + private object SaveLoadV1_0 { + +val thisFormatVersion = "1.0" + +val thisClassName = "org.apache.spark.mllib.clustering.LocalLDAModel" + +// Store the distribution of terms of each topic as a Row in data. +case class Data(topic: Vector) + +def save(sc: SparkContext, path: String, topicsMatrix: Matrix): Unit = { + + val sqlContext = new SQLContext(sc) + import sqlContext.implicits._ + + val k = topicsMatrix.numCols + val metadata = compact(render +(("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ + ("k" -> k) ~ ("vocabSize" -> topicsMatrix.numRows))) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + + val topicsDenseMatrix = topicsMatrix.toBreeze.toDenseMatrix + val topics = Range(0, k).map { topicInd => +Data(Vectors.dense((topicsDenseMatrix(::, topicInd).toArray))) + }.toSeq + sc.parallelize(topics, 1).toDF().write.parquet(Loader.dataPath(path)) +} + +def load(sc: SparkContext, path: String): LocalLDAModel = { + + val dataPath = Loader.dataPath(path) + val sqlContext = SQLContext.getOrCreate(sc) + val dataFrame = sqlContext.read.parquet(dataPath) + + Loader.checkSchema[Data](dataFrame.schema) + val topics = dataFrame.collect() --- End diff -- ouch --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9085][SQL] Remove LeafNode, UnaryNode, ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/7434#discussion_r34759969 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -40,13 +40,14 @@ import org.apache.spark.sql.types._ * requested. The attributes produced by this function will be automatically copied anytime rules * result in changes to the Generator or its children. */ -abstract class Generator extends Expression { - self: Product => +trait Generator extends Expression { self: Product => --- End diff -- we technically can for this one -- but what do we gain from it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9085][SQL] Remove LeafNode, UnaryNode, ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/7434#discussion_r34759923 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala --- @@ -20,17 +20,21 @@ package org.apache.spark.sql.catalyst.expressions import com.clearspring.analytics.stream.cardinality.HyperLogLog import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.types._ import org.apache.spark.util.collection.OpenHashSet -abstract class AggregateExpression extends Expression { +trait AggregateExpression extends Expression { --- End diff -- it'd result in a lot more code changes to get it working. e.g. ```scala abstract class AggregateFunction extends LeafExpression with AggregateExpression with Serializable { self: Product => /** Base should return the generic aggregate expression that this function is computing */ val base: AggregateExpression override def nullable: Boolean = base.nullable <--- this won't work anymore override def dataType: DataType = base.dataType ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9085][SQL] Remove LeafNode, UnaryNode, ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/7434#discussion_r34759850 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala --- @@ -277,15 +276,21 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { /** * A logical plan node with no children. */ -abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] { +abstract class LeafNode extends LogicalPlan { self: Product => --- End diff -- it's still required --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9085][SQL] Remove LeafNode, UnaryNode, ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/7434#discussion_r34759860 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala --- @@ -277,15 +276,21 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { /** * A logical plan node with no children. */ -abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] { +abstract class LeafNode extends LogicalPlan { self: Product => + + override def children: Seq[LogicalPlan] = Nil } /** * A logical plan node with single child. */ -abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] { +abstract class UnaryNode extends LogicalPlan { self: Product => --- End diff -- it's still required --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9085][SQL] Remove LeafNode, UnaryNode, ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/7434#discussion_r34759808 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala --- @@ -96,7 +95,7 @@ case class UnresolvedFunction(name: String, children: Seq[Expression]) extends E * Represents all of the input attributes to a given relational operator, for example in * "SELECT * FROM ...". A [[Star]] gets automatically expanded during analysis. */ -trait Star extends NamedExpression with trees.LeafNode[Expression] { +abstract class Star extends LeafExpression with NamedExpression { self: Product => --- End diff -- no it's still required --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5989] [MLlib] Model save/load for LDA
Github user MechCoder commented on a diff in the pull request: https://github.com/apache/spark/pull/6948#discussion_r34759686 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala --- @@ -184,6 +199,82 @@ class LocalLDAModel private[clustering] ( } +@Experimental +object LocalLDAModel extends Loader[LocalLDAModel] { + + private object SaveLoadV1_0 { + +val thisFormatVersion = "1.0" + +val thisClassName = "org.apache.spark.mllib.clustering.LocalLDAModel" + +// Store the distribution of terms of each topic as a Row in data. +case class Data(topic: Vector) + +def save(sc: SparkContext, path: String, topicsMatrix: Matrix): Unit = { + + val sqlContext = new SQLContext(sc) --- End diff -- I thought I had fixed these. But is seems I did it just for one model, leaving out the other, --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-7422][MLLIB] Add argmax to Vector, Spar...
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/6112#issuecomment-121849815 @GeorgeDittmar I will try to fix it and send you a PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8807][SparkR] Add between operator in S...
Github user sun-rui commented on the pull request: https://github.com/apache/spark/pull/7356#issuecomment-121848726 @viirya, 1. Why pass the lower bound and upper bound in a vector instead of pass separately? They are passed separately in Scala API. and A vector can not hold complex types (at least a list is expected). 2. Instead of writing R wrapper functions individually for each method of Scala Column, currently Column.R dynamically creates wrapper function using template. Is it possible to use the same mechanism for supporting between? something like: column_functions3 <- c("between") createColumnFunction3 <- function(name) { setMethod(name, signature(x = "Column"), function(x, arg1, arg2) { ... # some check of arg1/arg2 jc <- callJMethod(x@jc, name, arg1, arg2) column(jc) }) } --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8807][SparkR] Add between operator in S...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/7356 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8972][SQL]Incorrect result for rollup
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/7343 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9058][SQL] Split projectionCode if it i...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7418#issuecomment-121846610 [Test build #37464 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37464/consoleFull) for PR 7418 at commit [`b8e274e`](https://github.com/apache/spark/commit/b8e274e50e97303c3dfc8de668f7005c80a069e2). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8995][SQL] cast date strings like '2015...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/7353#issuecomment-121846456 LGTM, just some minor comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8972][SQL]Incorrect result for rollup
Github user yhuai commented on the pull request: https://github.com/apache/spark/pull/7343#issuecomment-121846486 LGTM. I am merging it to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8995][SQL] cast date strings like '2015...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/7353#discussion_r34759205 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -180,4 +182,202 @@ object DateTimeUtils { val nanos = (us % MICROS_PER_SECOND) * 1000L (day.toInt, secondsInDay * NANOS_PER_SECOND + nanos) } + + /** + * Parses a given UTF8 date string to the corresponding a corresponding [[Long]] value. + * The return type is [[Option]] in order to distinguish between 0L and null. The following + * formats are allowed: + * + * `` + * `-[m]m` + * `-[m]m-[d]d` + * `-[m]m-[d]d ` + * `-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]` + * `-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]Z` + * `-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]-[h]h:[m]m` + * `-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]+[h]h:[m]m` + * `-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]` + * `-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]Z` + * `-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]-[h]h:[m]m` + * `-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]+[h]h:[m]m` + * `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]` + * `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]Z` + * `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]-[h]h:[m]m` + * `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]+[h]h:[m]m` + * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]` + * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]Z` + * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]-[h]h:[m]m` + * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]+[h]h:[m]m` + */ + def stringToTimestamp(s: UTF8String): Option[Long] = { +if (s == null) { + return None +} +var timeZone: Option[Byte] = None +val segments: Array[Int] = Array[Int](1, 1, 1, 0, 0, 0, 0, 0, 0) +var i = 0 +var currentSegmentValue = 0 +val bytes = s.getBytes +var j = 0 +var digitsMilli = 0 +var justTime = false +while (j < bytes.length) { + val b = bytes(j) + val parsedValue = b - '0'.toByte + if (parsedValue < 0 || parsedValue > 9) { +if (j == 0 && b == 'T') { + justTime = true + i += 3 +} else if (i < 2) { + if (b == '-') { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 + } else if (i == 0 && b == ':') { +justTime = true +segments(3) = currentSegmentValue +currentSegmentValue = 0 +i = 4 + } else { +return None + } +} else if (i == 2) { + if (b == ' ' || b == 'T') { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 + } else { +return None + } +} else if (i == 3 || i == 4) { + if (b == ':') { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 + } else { +return None + } +} else if (i == 5 || i == 6) { + if (b == 'Z') { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 +timeZone = Some(43) + } else if (b == '-' || b == '+') { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 +timeZone = Some(b) + } else if (b == '.' && i == 5) { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 + } else { +return None + } + if (i == 6 && b != '.') { +i += 1 + } +} else { + if (b == ':' || b == ' ') { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 + } else { +return None + } +} + } else { +if (i == 6) { + digitsMilli += 1 +} +currentSegmentValue = currentSegmentValue * 10 + parsedValue + } + j += 1 +} + +segments(i) = currentSegmentValue + +while (digitsMilli < 6) { + segments(6) *= 10 + digitsMilli += 1 +} + +if (!justTime && (segments(0) < 1000 || segments(0) > || segments(1) < 1 || +segments(1) > 12 || segments(2) < 1 || segments(2) > 31)) { + re
[GitHub] spark pull request: [SPARK-9058][SQL] Split projectionCode if it i...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7418#issuecomment-121846231 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8995][SQL] cast date strings like '2015...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/7353#discussion_r34759109 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -180,4 +182,202 @@ object DateTimeUtils { val nanos = (us % MICROS_PER_SECOND) * 1000L (day.toInt, secondsInDay * NANOS_PER_SECOND + nanos) } + + /** + * Parses a given UTF8 date string to the corresponding a corresponding [[Long]] value. + * The return type is [[Option]] in order to distinguish between 0L and null. The following + * formats are allowed: + * + * `` + * `-[m]m` + * `-[m]m-[d]d` + * `-[m]m-[d]d ` + * `-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]` + * `-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]Z` + * `-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]-[h]h:[m]m` + * `-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]+[h]h:[m]m` + * `-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]` + * `-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]Z` + * `-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]-[h]h:[m]m` + * `-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]+[h]h:[m]m` + * `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]` + * `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]Z` + * `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]-[h]h:[m]m` + * `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]+[h]h:[m]m` + * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]` + * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]Z` + * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]-[h]h:[m]m` + * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]+[h]h:[m]m` + */ + def stringToTimestamp(s: UTF8String): Option[Long] = { +if (s == null) { + return None +} +var timeZone: Option[Byte] = None +val segments: Array[Int] = Array[Int](1, 1, 1, 0, 0, 0, 0, 0, 0) +var i = 0 +var currentSegmentValue = 0 +val bytes = s.getBytes +var j = 0 +var digitsMilli = 0 +var justTime = false +while (j < bytes.length) { + val b = bytes(j) + val parsedValue = b - '0'.toByte + if (parsedValue < 0 || parsedValue > 9) { +if (j == 0 && b == 'T') { + justTime = true + i += 3 +} else if (i < 2) { + if (b == '-') { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 + } else if (i == 0 && b == ':') { +justTime = true +segments(3) = currentSegmentValue +currentSegmentValue = 0 +i = 4 + } else { +return None + } +} else if (i == 2) { + if (b == ' ' || b == 'T') { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 + } else { +return None + } +} else if (i == 3 || i == 4) { + if (b == ':') { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 + } else { +return None + } +} else if (i == 5 || i == 6) { + if (b == 'Z') { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 +timeZone = Some(43) + } else if (b == '-' || b == '+') { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 +timeZone = Some(b) + } else if (b == '.' && i == 5) { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 + } else { +return None + } + if (i == 6 && b != '.') { +i += 1 + } +} else { + if (b == ':' || b == ' ') { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 + } else { +return None + } +} + } else { +if (i == 6) { + digitsMilli += 1 +} +currentSegmentValue = currentSegmentValue * 10 + parsedValue + } + j += 1 +} + +segments(i) = currentSegmentValue + +while (digitsMilli < 6) { + segments(6) *= 10 + digitsMilli += 1 +} + +if (!justTime && (segments(0) < 1000 || segments(0) > || segments(1) < 1 || +segments(1) > 12 || segments(2) < 1 || segments(2) > 31)) { + re
[GitHub] spark pull request: [SPARK-9058][SQL] Split projectionCode if it i...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7418#issuecomment-121846213 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8995][SQL] cast date strings like '2015...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/7353#discussion_r34758993 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -180,4 +182,202 @@ object DateTimeUtils { val nanos = (us % MICROS_PER_SECOND) * 1000L (day.toInt, secondsInDay * NANOS_PER_SECOND + nanos) } + + /** + * Parses a given UTF8 date string to the corresponding a corresponding [[Long]] value. + * The return type is [[Option]] in order to distinguish between 0L and null. The following + * formats are allowed: + * + * `` + * `-[m]m` + * `-[m]m-[d]d` + * `-[m]m-[d]d ` + * `-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]` + * `-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]Z` + * `-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]-[h]h:[m]m` + * `-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]+[h]h:[m]m` + * `-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]` + * `-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]Z` + * `-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]-[h]h:[m]m` + * `-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]+[h]h:[m]m` + * `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]` + * `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]Z` + * `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]-[h]h:[m]m` + * `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]+[h]h:[m]m` + * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]` + * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]Z` + * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]-[h]h:[m]m` + * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]+[h]h:[m]m` + */ + def stringToTimestamp(s: UTF8String): Option[Long] = { +if (s == null) { + return None +} +var timeZone: Option[Byte] = None +val segments: Array[Int] = Array[Int](1, 1, 1, 0, 0, 0, 0, 0, 0) +var i = 0 +var currentSegmentValue = 0 +val bytes = s.getBytes +var j = 0 +var digitsMilli = 0 +var justTime = false +while (j < bytes.length) { + val b = bytes(j) + val parsedValue = b - '0'.toByte + if (parsedValue < 0 || parsedValue > 9) { +if (j == 0 && b == 'T') { + justTime = true + i += 3 +} else if (i < 2) { + if (b == '-') { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 + } else if (i == 0 && b == ':') { +justTime = true +segments(3) = currentSegmentValue +currentSegmentValue = 0 +i = 4 + } else { +return None + } +} else if (i == 2) { + if (b == ' ' || b == 'T') { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 + } else { +return None + } +} else if (i == 3 || i == 4) { + if (b == ':') { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 + } else { +return None + } +} else if (i == 5 || i == 6) { + if (b == 'Z') { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 +timeZone = Some(43) + } else if (b == '-' || b == '+') { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 +timeZone = Some(b) + } else if (b == '.' && i == 5) { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 + } else { +return None + } + if (i == 6 && b != '.') { +i += 1 + } +} else { + if (b == ':' || b == ' ') { +segments(i) = currentSegmentValue +currentSegmentValue = 0 +i += 1 + } else { +return None + } +} + } else { +if (i == 6) { + digitsMilli += 1 +} +currentSegmentValue = currentSegmentValue * 10 + parsedValue + } + j += 1 +} + +segments(i) = currentSegmentValue + +while (digitsMilli < 6) { + segments(6) *= 10 + digitsMilli += 1 +} + +if (!justTime && (segments(0) < 1000 || segments(0) > || segments(1) < 1 || +segments(1) > 12 || segments(2) < 1 || segments(2) > 31)) { + re
[GitHub] spark pull request: [SPARK-8995][SQL] cast date strings like '2015...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/7353#discussion_r34758673 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala --- @@ -86,4 +88,198 @@ class DateTimeUtilsSuite extends SparkFunSuite { checkFromToJavaDate(new Date(df1.parse("1776-07-04 10:30:00").getTime)) checkFromToJavaDate(new Date(df2.parse("1776-07-04 18:30:00 UTC").getTime)) } + + test("string to date") { +val millisPerDay = 1000L * 3600L * 24L +var c = Calendar.getInstance() +c.set(2015, 0, 28, 0, 0, 0) +c.set(Calendar.MILLISECOND, 0) + assert(DateTimeUtils.stringToDate(UTF8String.fromString("2015-01-28")).get == + c.getTimeInMillis / millisPerDay) +c.set(2015, 0, 1, 0, 0, 0) +c.set(Calendar.MILLISECOND, 0) +assert(DateTimeUtils.stringToDate(UTF8String.fromString("2015")).get == + c.getTimeInMillis / millisPerDay) +c = Calendar.getInstance() +c.set(2015, 2, 1, 0, 0, 0) +c.set(Calendar.MILLISECOND, 0) + assert(DateTimeUtils.stringToDate(UTF8String.fromString("2015-03")).get == + c.getTimeInMillis / millisPerDay) +c = Calendar.getInstance() +c.set(2015, 2, 18, 0, 0, 0) +c.set(Calendar.MILLISECOND, 0) + assert(DateTimeUtils.stringToDate(UTF8String.fromString("2015-03-18")).get == + c.getTimeInMillis / millisPerDay) +assert(DateTimeUtils.stringToDate(UTF8String.fromString("2015-03-18 ")).get == + c.getTimeInMillis / millisPerDay) +assert(DateTimeUtils.stringToDate(UTF8String.fromString("2015-03-18 123142")).get == + c.getTimeInMillis / millisPerDay) + assert(DateTimeUtils.stringToDate(UTF8String.fromString("2015-03-18T123123")).get == + c.getTimeInMillis / millisPerDay) + assert(DateTimeUtils.stringToDate(UTF8String.fromString("2015-03-18T")).get == + c.getTimeInMillis / millisPerDay) + + assert(DateTimeUtils.stringToDate(UTF8String.fromString("2015-03-18X")).isEmpty) + assert(DateTimeUtils.stringToDate(UTF8String.fromString("2015/03/18")).isEmpty) + assert(DateTimeUtils.stringToDate(UTF8String.fromString("2015.03.18")).isEmpty) + assert(DateTimeUtils.stringToDate(UTF8String.fromString("20150318")).isEmpty) + assert(DateTimeUtils.stringToDate(UTF8String.fromString("2015-031-8")).isEmpty) + } + + test("string to timestamp") { +var c = Calendar.getInstance() +c.set(1969, 11, 31, 16, 0, 0) +c.set(Calendar.MILLISECOND, 0) + assert(DateTimeUtils.stringToTimestamp(UTF8String.fromString("1969-12-31 16:00:00")).get == + c.getTimeInMillis * 1000) +c.set(2015, 0, 1, 0, 0, 0) +c.set(Calendar.MILLISECOND, 0) + assert(DateTimeUtils.stringToTimestamp(UTF8String.fromString("2015")).get == --- End diff -- It's better to use `===` in tests (for better failure message). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8807][SparkR] Add between operator in S...
Github user shivaram commented on the pull request: https://github.com/apache/spark/pull/7356#issuecomment-121844789 Thanks @viirya -- LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r34758587 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -872,6 +872,25 @@ class DAGScheduler( // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) +val taskIdToLocations = try { + stage match { +case s: ShuffleMapStage => + partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap +case s: ResultStage => + val job = s.resultOfJob.get + partitionsToCompute.map { id => +val p = job.partitions(id) +(id, getPreferredLocs(stage.rdd, p)) + }.toMap + } +} catch { + case NonFatal(e) => +abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}") +runningStages -= stage +return +} +stage.latestInfo.taskLocalityPreferences = Some(taskIdToLocations.values.toSeq) --- End diff -- Yeah, this is a way, I will change it, another concern is about Mima test, since this is a public class, I will test it locally, thanks for your advises :). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8807][SparkR] Add between operator in S...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7356#issuecomment-121844520 [Test build #37463 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37463/console) for PR 7356 at commit [`7f51b44`](https://github.com/apache/spark/commit/7f51b441974b73384ebed276cb567b0e09750059). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8807][SparkR] Add between operator in S...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7356#issuecomment-121844558 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8271][SQL]string function: soundex
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/7115#issuecomment-121844421 @HuJiayin you need to rebase the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9058][SQL] Split projectionCode if it i...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7418#issuecomment-121843409 [Test build #37461 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37461/console) for PR 7418 at commit [`12d3794`](https://github.com/apache/spark/commit/12d3794b009a90d21de9a1d52d4f3ea9503f2b58). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9058][SQL] Split projectionCode if it i...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7418#issuecomment-121843429 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8807][SparkR] Add between operator in S...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7356#issuecomment-121843303 [Test build #37463 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37463/consoleFull) for PR 7356 at commit [`7f51b44`](https://github.com/apache/spark/commit/7f51b441974b73384ebed276cb567b0e09750059). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7131] [ml] Copy Decision Tree, Random F...
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/7294#issuecomment-121842954 @jkbradley I made one pass: 1. Some of my comments also apply to the old implementation. So I don't think it is necessary to address them in this PR. 2. It is quite hard to track what have been changed, because you merged DecisionTree and RandomForest into one file and removed bins. Next time we should think of a better approach, making the real changes more discoverable. Except some style issues and the random seed, it looks good to me:) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8807][SparkR] Add between operator in S...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7356#issuecomment-121842704 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8807][SparkR] Add between operator in S...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7356#issuecomment-121842697 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7131] [ml] Copy Decision Tree, Random F...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/7294#discussion_r34757704 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala --- @@ -0,0 +1,1131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import java.io.IOException + +import scala.collection.mutable +import scala.util.Random + +import org.apache.spark.Logging +import org.apache.spark.ml.classification.DecisionTreeClassificationModel +import org.apache.spark.ml.regression.DecisionTreeRegressionModel +import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy} +import org.apache.spark.mllib.tree.impl.{BaggedPoint, DTStatsAggregator, DecisionTreeMetadata, + TimeTracker} +import org.apache.spark.mllib.tree.impurity.ImpurityCalculator +import org.apache.spark.mllib.tree.model.{InformationGainStats, Predict} +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.random.{SamplingUtils, XORShiftRandom} + + +private[ml] object RandomForest extends Logging { + + /** + * Train a random forest. + * @param input Training data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] + * @return an unweighted set of trees + */ + def run( + input: RDD[LabeledPoint], + strategy: OldStrategy, + numTrees: Int, + featureSubsetStrategy: String, + seed: Long, + parentUID: Option[String] = None): Array[DecisionTreeModel] = { + +val timer = new TimeTracker() + +timer.start("total") + +timer.start("init") + +val retaggedInput = input.retag(classOf[LabeledPoint]) +val metadata = + DecisionTreeMetadata.buildMetadata(retaggedInput, strategy, numTrees, featureSubsetStrategy) +logDebug("algo = " + strategy.algo) +logDebug("numTrees = " + numTrees) +logDebug("seed = " + seed) +logDebug("maxBins = " + metadata.maxBins) +logDebug("featureSubsetStrategy = " + featureSubsetStrategy) +logDebug("numFeaturesPerNode = " + metadata.numFeaturesPerNode) +logDebug("subsamplingRate = " + strategy.subsamplingRate) + +// Find the splits and the corresponding bins (interval between the splits) using a sample +// of the input data. +timer.start("findSplitsBins") +val splits = findSplits(retaggedInput, metadata) +timer.stop("findSplitsBins") +logDebug("numBins: feature: number of bins") +logDebug(Range(0, metadata.numFeatures).map { featureIndex => + s"\t$featureIndex\t${metadata.numBins(featureIndex)}" +}.mkString("\n")) + +// Bin feature values (TreePoint representation). +// Cache input RDD for speedup during multiple passes. +val treeInput = TreePoint.convertToTreeRDD(retaggedInput, splits, metadata) + +val withReplacement = if (numTrees > 1) true else false + +val baggedInput = BaggedPoint.convertToBaggedRDD(treeInput, strategy.subsamplingRate, numTrees, + withReplacement, seed).persist(StorageLevel.MEMORY_AND_DISK) --- End diff -- Put `.persist` to a new line for readability. ~~~scala val baggedInput = BaggedPoint.convertToBaggedRDD( treeInput, strategy.subsamplingRate, numTrees, withReplacement, seed ).persist(StorageLevel.MEMORY_AND_DISK) ~~~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spa
[GitHub] spark pull request: [SPARK-7131] [ml] Copy Decision Tree, Random F...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/7294#discussion_r34757701 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala --- @@ -0,0 +1,1131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import java.io.IOException + +import scala.collection.mutable +import scala.util.Random + +import org.apache.spark.Logging +import org.apache.spark.ml.classification.DecisionTreeClassificationModel +import org.apache.spark.ml.regression.DecisionTreeRegressionModel +import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy} +import org.apache.spark.mllib.tree.impl.{BaggedPoint, DTStatsAggregator, DecisionTreeMetadata, + TimeTracker} +import org.apache.spark.mllib.tree.impurity.ImpurityCalculator +import org.apache.spark.mllib.tree.model.{InformationGainStats, Predict} +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.random.{SamplingUtils, XORShiftRandom} + + +private[ml] object RandomForest extends Logging { + + /** + * Train a random forest. + * @param input Training data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] + * @return an unweighted set of trees + */ + def run( + input: RDD[LabeledPoint], + strategy: OldStrategy, + numTrees: Int, + featureSubsetStrategy: String, + seed: Long, + parentUID: Option[String] = None): Array[DecisionTreeModel] = { + +val timer = new TimeTracker() + +timer.start("total") + +timer.start("init") + +val retaggedInput = input.retag(classOf[LabeledPoint]) +val metadata = + DecisionTreeMetadata.buildMetadata(retaggedInput, strategy, numTrees, featureSubsetStrategy) +logDebug("algo = " + strategy.algo) +logDebug("numTrees = " + numTrees) +logDebug("seed = " + seed) +logDebug("maxBins = " + metadata.maxBins) +logDebug("featureSubsetStrategy = " + featureSubsetStrategy) +logDebug("numFeaturesPerNode = " + metadata.numFeaturesPerNode) +logDebug("subsamplingRate = " + strategy.subsamplingRate) + +// Find the splits and the corresponding bins (interval between the splits) using a sample +// of the input data. +timer.start("findSplitsBins") +val splits = findSplits(retaggedInput, metadata) +timer.stop("findSplitsBins") +logDebug("numBins: feature: number of bins") +logDebug(Range(0, metadata.numFeatures).map { featureIndex => + s"\t$featureIndex\t${metadata.numBins(featureIndex)}" +}.mkString("\n")) + +// Bin feature values (TreePoint representation). +// Cache input RDD for speedup during multiple passes. +val treeInput = TreePoint.convertToTreeRDD(retaggedInput, splits, metadata) + +val withReplacement = if (numTrees > 1) true else false --- End diff -- `if (numTrees > 1) true else false` -> `numTrees > 1` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7131] [ml] Copy Decision Tree, Random F...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/7294#discussion_r34757708 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala --- @@ -0,0 +1,1131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import java.io.IOException + +import scala.collection.mutable +import scala.util.Random + +import org.apache.spark.Logging +import org.apache.spark.ml.classification.DecisionTreeClassificationModel +import org.apache.spark.ml.regression.DecisionTreeRegressionModel +import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy} +import org.apache.spark.mllib.tree.impl.{BaggedPoint, DTStatsAggregator, DecisionTreeMetadata, + TimeTracker} +import org.apache.spark.mllib.tree.impurity.ImpurityCalculator +import org.apache.spark.mllib.tree.model.{InformationGainStats, Predict} +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.random.{SamplingUtils, XORShiftRandom} + + +private[ml] object RandomForest extends Logging { + + /** + * Train a random forest. + * @param input Training data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] + * @return an unweighted set of trees + */ + def run( + input: RDD[LabeledPoint], + strategy: OldStrategy, + numTrees: Int, + featureSubsetStrategy: String, + seed: Long, + parentUID: Option[String] = None): Array[DecisionTreeModel] = { + +val timer = new TimeTracker() + +timer.start("total") + +timer.start("init") + +val retaggedInput = input.retag(classOf[LabeledPoint]) +val metadata = + DecisionTreeMetadata.buildMetadata(retaggedInput, strategy, numTrees, featureSubsetStrategy) +logDebug("algo = " + strategy.algo) +logDebug("numTrees = " + numTrees) +logDebug("seed = " + seed) +logDebug("maxBins = " + metadata.maxBins) +logDebug("featureSubsetStrategy = " + featureSubsetStrategy) +logDebug("numFeaturesPerNode = " + metadata.numFeaturesPerNode) +logDebug("subsamplingRate = " + strategy.subsamplingRate) + +// Find the splits and the corresponding bins (interval between the splits) using a sample +// of the input data. +timer.start("findSplitsBins") +val splits = findSplits(retaggedInput, metadata) +timer.stop("findSplitsBins") +logDebug("numBins: feature: number of bins") +logDebug(Range(0, metadata.numFeatures).map { featureIndex => + s"\t$featureIndex\t${metadata.numBins(featureIndex)}" +}.mkString("\n")) + +// Bin feature values (TreePoint representation). +// Cache input RDD for speedup during multiple passes. +val treeInput = TreePoint.convertToTreeRDD(retaggedInput, splits, metadata) + +val withReplacement = if (numTrees > 1) true else false + +val baggedInput = BaggedPoint.convertToBaggedRDD(treeInput, strategy.subsamplingRate, numTrees, + withReplacement, seed).persist(StorageLevel.MEMORY_AND_DISK) + +// depth of the decision tree +val maxDepth = strategy.maxDepth +require(maxDepth <= 30, + s"DecisionTree currently only supports maxDepth <= 30, but was given maxDepth = $maxDepth.") + +// Max memory usage for aggregates +// TODO: Calculate memory usage more precisely. +val maxMemoryUsage: Long = strategy.maxMemoryInMB * 1024L * 1024L +logDebug("max memory usage for aggregates = " + maxMemoryUsage + " bytes.") +val maxMemoryPerNode = { + val featureSubset: Option[Array[Int]] = if (metadata.subsamplingFeatures) { +// Find numFeaturesPerNode largest bins to get an upper bound on memory usage. +
[GitHub] spark pull request: [SPARK-7131] [ml] Copy Decision Tree, Random F...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/7294#discussion_r34757721 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala --- @@ -0,0 +1,1131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import java.io.IOException + +import scala.collection.mutable +import scala.util.Random + +import org.apache.spark.Logging +import org.apache.spark.ml.classification.DecisionTreeClassificationModel +import org.apache.spark.ml.regression.DecisionTreeRegressionModel +import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy} +import org.apache.spark.mllib.tree.impl.{BaggedPoint, DTStatsAggregator, DecisionTreeMetadata, + TimeTracker} +import org.apache.spark.mllib.tree.impurity.ImpurityCalculator +import org.apache.spark.mllib.tree.model.{InformationGainStats, Predict} +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.random.{SamplingUtils, XORShiftRandom} + + +private[ml] object RandomForest extends Logging { + + /** + * Train a random forest. + * @param input Training data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] + * @return an unweighted set of trees + */ + def run( + input: RDD[LabeledPoint], + strategy: OldStrategy, + numTrees: Int, + featureSubsetStrategy: String, + seed: Long, + parentUID: Option[String] = None): Array[DecisionTreeModel] = { + +val timer = new TimeTracker() + +timer.start("total") + +timer.start("init") + +val retaggedInput = input.retag(classOf[LabeledPoint]) +val metadata = + DecisionTreeMetadata.buildMetadata(retaggedInput, strategy, numTrees, featureSubsetStrategy) +logDebug("algo = " + strategy.algo) +logDebug("numTrees = " + numTrees) +logDebug("seed = " + seed) +logDebug("maxBins = " + metadata.maxBins) +logDebug("featureSubsetStrategy = " + featureSubsetStrategy) +logDebug("numFeaturesPerNode = " + metadata.numFeaturesPerNode) +logDebug("subsamplingRate = " + strategy.subsamplingRate) + +// Find the splits and the corresponding bins (interval between the splits) using a sample +// of the input data. +timer.start("findSplitsBins") +val splits = findSplits(retaggedInput, metadata) +timer.stop("findSplitsBins") +logDebug("numBins: feature: number of bins") +logDebug(Range(0, metadata.numFeatures).map { featureIndex => + s"\t$featureIndex\t${metadata.numBins(featureIndex)}" +}.mkString("\n")) + +// Bin feature values (TreePoint representation). +// Cache input RDD for speedup during multiple passes. +val treeInput = TreePoint.convertToTreeRDD(retaggedInput, splits, metadata) + +val withReplacement = if (numTrees > 1) true else false + +val baggedInput = BaggedPoint.convertToBaggedRDD(treeInput, strategy.subsamplingRate, numTrees, + withReplacement, seed).persist(StorageLevel.MEMORY_AND_DISK) + +// depth of the decision tree +val maxDepth = strategy.maxDepth +require(maxDepth <= 30, + s"DecisionTree currently only supports maxDepth <= 30, but was given maxDepth = $maxDepth.") + +// Max memory usage for aggregates +// TODO: Calculate memory usage more precisely. +val maxMemoryUsage: Long = strategy.maxMemoryInMB * 1024L * 1024L +logDebug("max memory usage for aggregates = " + maxMemoryUsage + " bytes.") +val maxMemoryPerNode = { + val featureSubset: Option[Array[Int]] = if (metadata.subsamplingFeatures) { +// Find numFeaturesPerNode largest bins to get an upper bound on memory usage. +
[GitHub] spark pull request: [SPARK-1855] Local checkpointing
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7279#issuecomment-121840805 [Test build #1081 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1081/consoleFull) for PR 7279 at commit [`a92657d`](https://github.com/apache/spark/commit/a92657d815e7837a64d69546acc954a792ae1d1a). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r34757572 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala --- @@ -24,11 +24,15 @@ package org.apache.spark private[spark] trait ExecutorAllocationClient { /** - * Express a preference to the cluster manager for a given total number of executors. + * Express a preference to the cluster manager for a given total number of executors, + * number of locality aware pending tasks and related locality preferences. * This can result in canceling pending requests or filing additional requests. * @return whether the request is acknowledged by the cluster manager. */ - private[spark] def requestTotalExecutors(numExecutors: Int): Boolean + private[spark] def requestTotalExecutors( + numExecutors: Int, + localityAwarePendingTasks: Int, + preferredLocalityToCount: Map[String, Int]): Boolean --- End diff -- Actually I think the key string is hostname, not executor :). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1855] Local checkpointing
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7279#issuecomment-121840800 [Test build #1080 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1080/consoleFull) for PR 7279 at commit [`a92657d`](https://github.com/apache/spark/commit/a92657d815e7837a64d69546acc954a792ae1d1a). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1855] Local checkpointing
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7279#issuecomment-121840775 [Test build #1079 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1079/consoleFull) for PR 7279 at commit [`a92657d`](https://github.com/apache/spark/commit/a92657d815e7837a64d69546acc954a792ae1d1a). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9085][SQL] Remove LeafNode, UnaryNode, ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/7434#discussion_r34757444 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala --- @@ -277,15 +276,21 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { /** * A logical plan node with no children. */ -abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] { +abstract class LeafNode extends LogicalPlan { self: Product => --- End diff -- remove `self: Product =>`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9085][SQL] Remove LeafNode, UnaryNode, ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/7434#discussion_r34757450 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala --- @@ -277,15 +276,21 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { /** * A logical plan node with no children. */ -abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] { +abstract class LeafNode extends LogicalPlan { self: Product => + + override def children: Seq[LogicalPlan] = Nil } /** * A logical plan node with single child. */ -abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] { +abstract class UnaryNode extends LogicalPlan { self: Product => --- End diff -- remove `self: Product =>`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7131] [ml] Copy Decision Tree, Random F...
Github user manishamde commented on the pull request: https://github.com/apache/spark/pull/7294#issuecomment-121840575 @jkbradley It looks good to me. It might be a good idea to run the spark.mllib and spark.ml models on a couple of datasets to ensure there are no regressions (in accuracy or performance). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9085][SQL] Remove LeafNode, UnaryNode, ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/7434#discussion_r34757416 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala --- @@ -96,7 +95,7 @@ case class UnresolvedFunction(name: String, children: Seq[Expression]) extends E * Represents all of the input attributes to a given relational operator, for example in * "SELECT * FROM ...". A [[Star]] gets automatically expanded during analysis. */ -trait Star extends NamedExpression with trees.LeafNode[Expression] { +abstract class Star extends LeafExpression with NamedExpression { self: Product => --- End diff -- as it's abstrac class now, can we remove `self: Product =>`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r34757342 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -872,6 +872,25 @@ class DAGScheduler( // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) +val taskIdToLocations = try { + stage match { +case s: ShuffleMapStage => + partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap +case s: ResultStage => + val job = s.resultOfJob.get + partitionsToCompute.map { id => +val p = job.partitions(id) +(id, getPreferredLocs(stage.rdd, p)) + }.toMap + } +} catch { + case NonFatal(e) => +abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}") +runningStages -= stage +return +} +stage.latestInfo.taskLocalityPreferences = Some(taskIdToLocations.values.toSeq) --- End diff -- What if you make taskLocalityPreferences the last parameter in the StageInfo constructor, and have it default to an empty list? That way you won't have to change the unit tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9085][SQL] Remove LeafNode, UnaryNode, ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/7434#discussion_r34757289 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -342,7 +342,7 @@ object ConstantFolding extends Rule[LogicalPlan] { case l: Literal => l // Fold expressions that are foldable. - case e if e.foldable => Literal.create(e.eval(null), e.dataType) + case e if e.foldable => Literal.create(e.eval(EmptyRow), e.dataType) --- End diff -- I remember we set a default parameter value for `eval`, so we can just write `e.eval()` here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9085][SQL] Remove LeafNode, UnaryNode, ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7434#issuecomment-121840217 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9085][SQL] Remove LeafNode, UnaryNode, ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/7434#discussion_r34757259 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -40,13 +40,14 @@ import org.apache.spark.sql.types._ * requested. The attributes produced by this function will be automatically copied anytime rules * result in changes to the Generator or its children. */ -abstract class Generator extends Expression { - self: Product => +trait Generator extends Expression { self: Product => --- End diff -- how about: ```scala trait Generator { self: Expression => } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9085][SQL] Remove LeafNode, UnaryNode, ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7434#issuecomment-121840141 [Test build #37458 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37458/console) for PR 7434 at commit [`9e8a4de`](https://github.com/apache/spark/commit/9e8a4def6f02e03899fa2fafdd2841c513d280af). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute ` * `abstract class Star extends LeafExpression with NamedExpression ` * `case class UnresolvedAlias(child: Expression) extends UnaryExpression with NamedExpression ` * `case class SortOrder(child: Expression, direction: SortDirection) extends UnaryExpression ` * `trait AggregateExpression extends Expression ` * `trait PartialAggregate extends AggregateExpression ` * `case class Min(child: Expression) extends UnaryExpression with PartialAggregate ` * `case class Max(child: Expression) extends UnaryExpression with PartialAggregate ` * `case class Count(child: Expression) extends UnaryExpression with PartialAggregate ` * `case class Average(child: Expression) extends UnaryExpression with PartialAggregate ` * `case class Sum(child: Expression) extends UnaryExpression with PartialAggregate ` * `case class SumDistinct(child: Expression) extends UnaryExpression with PartialAggregate ` * `case class First(child: Expression) extends UnaryExpression with PartialAggregate ` * `case class Last(child: Expression) extends UnaryExpression with PartialAggregate ` * `trait Generator extends Expression ` * `case class Explode(child: Expression) extends UnaryExpression with Generator ` * `trait NamedExpression extends Expression ` * `abstract class Attribute extends LeafExpression with NamedExpression ` * `case class PrettyAttribute(name: String) extends Attribute ` * `abstract class LeafNode extends LogicalPlan ` * `abstract class UnaryNode extends LogicalPlan ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9085][SQL] Remove LeafNode, UnaryNode, ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/7434#discussion_r34757251 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -40,13 +40,14 @@ import org.apache.spark.sql.types._ * requested. The attributes produced by this function will be automatically copied anytime rules * result in changes to the Generator or its children. */ -abstract class Generator extends Expression { - self: Product => +trait Generator extends Expression { self: Product => --- End diff -- how about: ```scala trait Generator { self: Expression => } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9085][SQL] Remove LeafNode, UnaryNode, ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/7434#discussion_r34757208 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala --- @@ -20,17 +20,21 @@ package org.apache.spark.sql.catalyst.expressions import com.clearspring.analytics.stream.cardinality.HyperLogLog import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.types._ import org.apache.spark.util.collection.OpenHashSet -abstract class AggregateExpression extends Expression { +trait AggregateExpression extends Expression { --- End diff -- how about: ```scala trait AggregateExpression { self: Expression => } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r34757199 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -872,6 +872,25 @@ class DAGScheduler( // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) +val taskIdToLocations = try { + stage match { +case s: ShuffleMapStage => + partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap +case s: ResultStage => + val job = s.resultOfJob.get + partitionsToCompute.map { id => +val p = job.partitions(id) +(id, getPreferredLocs(stage.rdd, p)) + }.toMap + } +} catch { + case NonFatal(e) => +abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}") +runningStages -= stage +return +} +stage.latestInfo.taskLocalityPreferences = Some(taskIdToLocations.values.toSeq) --- End diff -- I think if we move this field `taskLocalityPreferences` to `StageInfo`, though we don't require `Option` and can be more consistent to other code, the big concern is that we need to change lots of places in the unit test where uses `StageInfo`, I'm not sure is it worthwhile to change it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7119][SQL]Give script a default serde w...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/6638#issuecomment-121839582 [Test build #37462 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37462/consoleFull) for PR 6638 at commit [`4ab11b7`](https://github.com/apache/spark/commit/4ab11b7e5df106993682aef7d4bc7759827734b6). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8791][SQL] Improve the InternalRow.hash...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/7189#issuecomment-121839736 Thank you all for reviewing the code for me, but I think there would be more general way to solve this, closing it for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8791][SQL] Improve the InternalRow.hash...
Github user chenghao-intel closed the pull request at: https://github.com/apache/spark/pull/7189 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7119][SQL]Give script a default serde w...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/6638#issuecomment-121838429 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7119][SQL]Give script a default serde w...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/6638#issuecomment-121838423 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Refactor SimpleFutureAction.onCom...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7385#issuecomment-121838283 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Refactor SimpleFutureAction.onCom...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7385#issuecomment-121838243 **[Test build #37442 timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37442/console)** for PR 7385 at commit [`c6fdc21`](https://github.com/apache/spark/commit/c6fdc2169f5bb8802b7b2d0019433de8bb0cae66) after a configured wait of `175m`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8964] [SQL] [WIP] Use Exchange to perfo...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/7334#issuecomment-121838166 @JoshRosen ,seems `execution.CollectLimit` will eventually invoke the code like (in SparkPlan.executeTake): ```scala sc.runJob(childRDD, (it: Iterator[InternalRow]) => it.take(left).toArray, p, allowLocal = false) ``` I am wondering if `execution.CollectLimit(limit, planLater(child))` V.S. `execution.Limit(global = true, limit, execution.Limit(global=false, limit, child))` are actually equals in data shuffling / copying, if so, probably we can simplify the code by removing the `CollectLimit` and `ReturnAnswer`. Sorry if I missed something. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9068][SQL] refactor the implicit type c...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/7420 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9068][SQL] refactor the implicit type c...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/7420#issuecomment-121834153 LGTM - @marmbrus and I can't really think of why anymore we needed it - maybe we don't need it anymore. Going to merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7412#issuecomment-121834064 Build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8968] [SQL] shuffled by the partition c...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7336#issuecomment-121834098 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8968] [SQL] shuffled by the partition c...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7336#issuecomment-121834028 [Test build #37459 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37459/console) for PR 7336 at commit [`b5ada0a`](https://github.com/apache/spark/commit/b5ada0ab4944661c8ab6bf030006d111657d13e6). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7412#issuecomment-121833931 [Test build #37452 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37452/console) for PR 7412 at commit [`095aa3a`](https://github.com/apache/spark/commit/095aa3a390446205a4d7b7ed1fbce46f2c93). * This patch **fails Spark unit tests**. * This patch **does not merge cleanly**. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8682][SQL][WIP] Range Join
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7379#issuecomment-121833017 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8682][SQL][WIP] Range Join
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7379#issuecomment-121832849 [Test build #37456 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37456/console) for PR 7379 at commit [`8204eae`](https://github.com/apache/spark/commit/8204eaed1b9399f17415afc6ce178c845f29746f). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case class BroadcastRangeJoin(` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9068][SQL] refactor the implicit type c...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/7420#discussion_r34756205 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala --- @@ -747,15 +742,9 @@ object HiveTypeCoercion { case (StringType, BinaryType) => Cast(e, BinaryType) case (any, StringType) if any != StringType => Cast(e, StringType) -// Type collection. -// First see if we can find our input type in the type collection. If we can, then just -// use the current expression; otherwise, find the first one we can implicitly cast. -case (_, TypeCollection(types)) => - if (types.exists(_.isSameType(inType))) { -e - } else { -types.flatMap(implicitCast(e, _)).headOption.orNull - } +// When we reach here, input type is not acceptable for any types in this type collection, +// try to find the first one we can implicitly cast. +case (_, TypeCollection(types)) => types.flatMap(implicitCast(e, _)).headOption.orNull --- End diff -- the first rule is: `case _ if expectedType.acceptsType(inType) => e`. So when we reach here, input type is not acceptable for any types in this type collection. see my comments above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8995][SQL] cast date strings like '2015...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7353#issuecomment-121830731 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r34756031 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -872,6 +872,25 @@ class DAGScheduler( // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) +val taskIdToLocations = try { + stage match { +case s: ShuffleMapStage => + partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap +case s: ResultStage => + val job = s.resultOfJob.get + partitionsToCompute.map { id => +val p = job.partitions(id) +(id, getPreferredLocs(stage.rdd, p)) + }.toMap + } +} catch { + case NonFatal(e) => +abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}") +runningStages -= stage +return +} +stage.latestInfo.taskLocalityPreferences = Some(taskIdToLocations.values.toSeq) --- End diff -- Thanks @kayousterhout , that's a good idea, I will change the code accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8995][SQL] cast date strings like '2015...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7353#issuecomment-121830665 [Test build #37454 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37454/console) for PR 7353 at commit [`ca1ae69`](https://github.com/apache/spark/commit/ca1ae69c1baa7d4d14946bdd2638aec47e05be86). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7119][SQL]Give script a default serde w...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/6638#issuecomment-121827741 [Test build #25 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SlowSparkPullRequestBuilder/25/console) for PR 6638 at commit [`2ee0488`](https://github.com/apache/spark/commit/2ee048825ad79a6a533ead969752b435af92166a). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7119][SQL]Give script a default serde w...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/6638#issuecomment-121827751 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9058][SQL] Split projectionCode if it i...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7418#issuecomment-121827685 [Test build #37461 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37461/consoleFull) for PR 7418 at commit [`12d3794`](https://github.com/apache/spark/commit/12d3794b009a90d21de9a1d52d4f3ea9503f2b58). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8245][SQL] FormatNumber/Length Support ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/7034 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9058][SQL] Split projectionCode if it i...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7418#issuecomment-121827618 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9058][SQL] Split projectionCode if it i...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7418#issuecomment-121827606 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8245][SQL] FormatNumber/Length Support ...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/7034#issuecomment-121827562 Thanks - merging this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8125] [SQL] Accelerates Parquet schema ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7396#issuecomment-121827555 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8125] [SQL] Accelerates Parquet schema ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7396#issuecomment-121827505 [Test build #37441 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37441/console) for PR 7396 at commit [`f122f10`](https://github.com/apache/spark/commit/f122f1070fb08cd737a42f683f7f8d1bb7f4a4ad). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class FakeFileStatus(` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9018][MLLIB] add stopwatches
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/7415#discussion_r34755425 --- Diff: mllib/src/test/scala/org/apache/spark/ml/util/StopwatchSuite.scala --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.util + +import org.apache.spark.SparkFunSuite +import org.apache.spark.mllib.util.MLlibTestSparkContext + +class StopwatchSuite extends SparkFunSuite with MLlibTestSparkContext { + + private def testStopwatchOnDriver(sw: Stopwatch): Unit = { +assert(sw.name === "sw") +assert(sw.elapsed() === 0L) +assert(!sw.isRunning) +intercept[AssertionError] { + sw.stop() +} +sw.start() +Thread.sleep(50) +val duration = sw.stop() +assert(duration >= 50 && duration < 100) // using a loose upper bound +val elapsed = sw.elapsed() +assert(elapsed === duration) +sw.start() +Thread.sleep(50) +val duration2 = sw.stop() +assert(duration2 >= 50 && duration2 < 100) +val elapsed2 = sw.elapsed() +assert(elapsed2 == duration + duration2) --- End diff -- Should we no longer bother with this? Or is it just for Longs (in which case enforcing consistency may be easiest)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5989] [MLlib] Model save/load for LDA
Github user jkbradley commented on the pull request: https://github.com/apache/spark/pull/6948#issuecomment-121827311 @MechCoder If you want to manually inspect the Parquet files, reading them back in using Spark SQL is probably the easiest way. You could also save them instead as JSON with a single partition, which might make it easier to read the files. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7119][SQL]Give script a default serde w...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/6638#issuecomment-121827335 [Test build #37451 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37451/console) for PR 6638 at commit [`2ee0488`](https://github.com/apache/spark/commit/2ee048825ad79a6a533ead969752b435af92166a). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5989] [MLlib] Model save/load for LDA
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/6948#discussion_r34755380 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala --- @@ -213,6 +214,46 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { } } + test("model save/load") { +// Test for LocalLDAModel. +val localModel = new LocalLDAModel(tinyTopics) +val tempDir1 = Utils.createTempDir() +val path1 = tempDir1.toURI.toString + +// Test for DistributedLDAModel. +val k = 3 +val topicSmoothing = 1.2 +val termSmoothing = 1.2 --- End diff -- use different topic,term smoothing values for testing save/load. I'd also call them doc/topic concentration for clarity. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7119][SQL]Give script a default serde w...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/6638#issuecomment-121827344 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5989] [MLlib] Model save/load for LDA
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/6948#discussion_r34755377 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala --- @@ -354,4 +445,140 @@ class DistributedLDAModel private ( // TODO: // override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ??? + override protected def formatVersion = "1.0" + + override def save(sc: SparkContext, path: String): Unit = { +DistributedLDAModel.SaveLoadV1_0.save( + sc, path, graph, globalTopicTotals, k, vocabSize, docConcentration, topicConcentration, + iterationTimes) + } +} + + +@Experimental +object DistributedLDAModel extends Loader[DistributedLDAModel]{ + + + object SaveLoadV1_0 { + +val thisFormatVersion = "1.0" + +val classNameV1_0 = "org.apache.spark.mllib.clustering.DistributedLDAModel" + +// Store the weight of each topic separately in a row. --- End diff -- update doc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5989] [MLlib] Model save/load for LDA
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/6948#discussion_r34755357 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala --- @@ -184,6 +199,82 @@ class LocalLDAModel private[clustering] ( } +@Experimental +object LocalLDAModel extends Loader[LocalLDAModel] { + + private object SaveLoadV1_0 { + +val thisFormatVersion = "1.0" + +val thisClassName = "org.apache.spark.mllib.clustering.LocalLDAModel" + +// Store the distribution of terms of each topic as a Row in data. +case class Data(topic: Vector) + +def save(sc: SparkContext, path: String, topicsMatrix: Matrix): Unit = { + + val sqlContext = new SQLContext(sc) --- End diff -- Use SQLContext.getOrCreate --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5989] [MLlib] Model save/load for LDA
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/6948#discussion_r34755364 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala --- @@ -184,6 +199,82 @@ class LocalLDAModel private[clustering] ( } +@Experimental +object LocalLDAModel extends Loader[LocalLDAModel] { + + private object SaveLoadV1_0 { + +val thisFormatVersion = "1.0" + +val thisClassName = "org.apache.spark.mllib.clustering.LocalLDAModel" + +// Store the distribution of terms of each topic as a Row in data. +case class Data(topic: Vector) + +def save(sc: SparkContext, path: String, topicsMatrix: Matrix): Unit = { + + val sqlContext = new SQLContext(sc) + import sqlContext.implicits._ + + val k = topicsMatrix.numCols + val metadata = compact(render +(("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ + ("k" -> k) ~ ("vocabSize" -> topicsMatrix.numRows))) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + + val topicsDenseMatrix = topicsMatrix.toBreeze.toDenseMatrix + val topics = Range(0, k).map { topicInd => +Data(Vectors.dense((topicsDenseMatrix(::, topicInd).toArray))) + }.toSeq + sc.parallelize(topics, 1).toDF().write.parquet(Loader.dataPath(path)) +} + +def load(sc: SparkContext, path: String): LocalLDAModel = { + + val dataPath = Loader.dataPath(path) + val sqlContext = SQLContext.getOrCreate(sc) + val dataFrame = sqlContext.read.parquet(dataPath) + + Loader.checkSchema[Data](dataFrame.schema) + val topics = dataFrame.collect() + val vocabSize = topics(0)(0).asInstanceOf[Vector].size --- End diff -- Use ```getAs[Vector](0)``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5989] [MLlib] Model save/load for LDA
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/6948#discussion_r34755378 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala --- @@ -354,4 +445,140 @@ class DistributedLDAModel private ( // TODO: // override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ??? + override protected def formatVersion = "1.0" + + override def save(sc: SparkContext, path: String): Unit = { +DistributedLDAModel.SaveLoadV1_0.save( + sc, path, graph, globalTopicTotals, k, vocabSize, docConcentration, topicConcentration, + iterationTimes) + } +} + + +@Experimental +object DistributedLDAModel extends Loader[DistributedLDAModel]{ + + + object SaveLoadV1_0 { + +val thisFormatVersion = "1.0" + +val classNameV1_0 = "org.apache.spark.mllib.clustering.DistributedLDAModel" + +// Store the weight of each topic separately in a row. +case class Data(globalTopicTotals: Vector) + +// Store each term and document vertex with an id and the topicWeights. +case class VertexData(id: Long, topicWeights: Vector) + +// Store each edge with the source id, destination id and tokenCounts. +case class EdgeData(srcId: Long, dstId: Long, tokenCounts: Double) + +def save( +sc: SparkContext, +path: String, +graph: Graph[LDA.TopicCounts, LDA.TokenCount], +globalTopicTotals: LDA.TopicCounts, +k: Int, +vocabSize: Int, +docConcentration: Double, +topicConcentration: Double, +iterationTimes: Array[Double]): Unit = { + + val sqlContext = SQLContext.getOrCreate(sc) + import sqlContext.implicits._ + + val metadata = compact(render +(("class" -> classNameV1_0) ~ ("version" -> thisFormatVersion) ~ + ("k" -> k) ~ ("vocabSize" -> vocabSize) ~ ("docConcentration" -> docConcentration) ~ + ("topicConcentration" -> topicConcentration) ~ + ("iterationTimes" -> iterationTimes.toSeq))) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + + val newPath = new Path(Loader.dataPath(path), "globalTopicTotals").toUri.toString + sc.parallelize(Seq(Data(Vectors.fromBreeze(globalTopicTotals.toDF() +.write.parquet(newPath) + + val verticesPath = new Path(Loader.dataPath(path), "topicCounts").toUri.toString + graph.vertices.map { case (ind, vertex) => +VertexData(ind, Vectors.fromBreeze(vertex)) + }.toDF().write.parquet(verticesPath) + + val edgesPath = new Path(Loader.dataPath(path), "tokenCounts").toUri.toString + graph.edges.map { case Edge(srcId, dstId, prop) => +EdgeData(srcId, dstId, prop) + }.toDF().write.parquet(edgesPath) +} + +def load( +sc: SparkContext, +path: String, +vocabSize: Int, +docConcentration: Double, +topicConcentration: Double, +iterationTimes: Array[Double]): DistributedLDAModel = { + + val dataPath = new Path(Loader.dataPath(path), "globalTopicTotals").toUri.toString + val vertexDataPath = new Path(Loader.dataPath(path), "topicCounts").toUri.toString + val edgeDataPath = new Path(Loader.dataPath(path), "tokenCounts").toUri.toString + val sqlContext = new SQLContext(sc) --- End diff -- Use SQLContext.getOrCreate --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5989] [MLlib] Model save/load for LDA
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/6948#discussion_r34755376 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala --- @@ -354,4 +445,140 @@ class DistributedLDAModel private ( // TODO: // override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ??? + override protected def formatVersion = "1.0" + + override def save(sc: SparkContext, path: String): Unit = { +DistributedLDAModel.SaveLoadV1_0.save( + sc, path, graph, globalTopicTotals, k, vocabSize, docConcentration, topicConcentration, + iterationTimes) + } +} + + +@Experimental +object DistributedLDAModel extends Loader[DistributedLDAModel]{ + + + object SaveLoadV1_0 { --- End diff -- make private --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5989] [MLlib] Model save/load for LDA
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/6948#discussion_r34755369 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala --- @@ -184,6 +199,82 @@ class LocalLDAModel private[clustering] ( } +@Experimental +object LocalLDAModel extends Loader[LocalLDAModel] { + + private object SaveLoadV1_0 { + +val thisFormatVersion = "1.0" + +val thisClassName = "org.apache.spark.mllib.clustering.LocalLDAModel" + +// Store the distribution of terms of each topic as a Row in data. +case class Data(topic: Vector) + +def save(sc: SparkContext, path: String, topicsMatrix: Matrix): Unit = { + + val sqlContext = new SQLContext(sc) + import sqlContext.implicits._ + + val k = topicsMatrix.numCols + val metadata = compact(render +(("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ + ("k" -> k) ~ ("vocabSize" -> topicsMatrix.numRows))) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + + val topicsDenseMatrix = topicsMatrix.toBreeze.toDenseMatrix + val topics = Range(0, k).map { topicInd => +Data(Vectors.dense((topicsDenseMatrix(::, topicInd).toArray))) + }.toSeq + sc.parallelize(topics, 1).toDF().write.parquet(Loader.dataPath(path)) +} + +def load(sc: SparkContext, path: String): LocalLDAModel = { + + val dataPath = Loader.dataPath(path) + val sqlContext = SQLContext.getOrCreate(sc) + val dataFrame = sqlContext.read.parquet(dataPath) + + Loader.checkSchema[Data](dataFrame.schema) + val topics = dataFrame.collect() + val vocabSize = topics(0)(0).asInstanceOf[Vector].size + val k = topics.size + + val brzTopics = BDM.zeros[Double](vocabSize, k) + topics.zipWithIndex.foreach { case (Row(vec: Vector), ind: Int) => +brzTopics(::, ind) := vec.toBreeze + } + new LocalLDAModel(Matrices.fromBreeze(brzTopics)) +} + } + + override def load(sc: SparkContext, path: String): LocalLDAModel = { + +val (loadedClassName, loadedVersion, metadata) = Loader.loadMetadata(sc, path) +implicit val formats = DefaultFormats +val expectedK = (metadata \ "k").extract[Int] +val expectedVocabSize = (metadata \ "vocabSize").extract[Int] +val classNameV1_0 = SaveLoadV1_0.thisClassName + +val model = (loadedClassName, loadedVersion) match { + case (className, "1.0") if className == classNameV1_0 => +SaveLoadV1_0.load(sc, path) + case _ => throw new Exception( +s"LocalLDAModel.load did not recognize model with (className, format version):" + +s"($loadedClassName, $loadedVersion). Supported:\n" + +s" ($classNameV1_0, 1.0)") +} + +val topicsMatrix = model.topicsMatrix +require(expectedK == topicsMatrix.numCols, + s"LocalLDAModel requires $expectedK topics, got $topicsMatrix.numCols topics") --- End diff -- Need braces around "topicsMatrix.numCols" Please check for this issue everywhere --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5989] [MLlib] Model save/load for LDA
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/6948#discussion_r34755375 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala --- @@ -354,4 +445,140 @@ class DistributedLDAModel private ( // TODO: // override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ??? + override protected def formatVersion = "1.0" + + override def save(sc: SparkContext, path: String): Unit = { +DistributedLDAModel.SaveLoadV1_0.save( + sc, path, graph, globalTopicTotals, k, vocabSize, docConcentration, topicConcentration, + iterationTimes) + } +} + + +@Experimental +object DistributedLDAModel extends Loader[DistributedLDAModel]{ + --- End diff -- remove extra newline --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5989] [MLlib] Model save/load for LDA
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/6948#discussion_r34755379 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala --- @@ -354,4 +445,140 @@ class DistributedLDAModel private ( // TODO: // override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ??? + override protected def formatVersion = "1.0" + + override def save(sc: SparkContext, path: String): Unit = { +DistributedLDAModel.SaveLoadV1_0.save( + sc, path, graph, globalTopicTotals, k, vocabSize, docConcentration, topicConcentration, + iterationTimes) + } +} + + +@Experimental +object DistributedLDAModel extends Loader[DistributedLDAModel]{ + + + object SaveLoadV1_0 { + +val thisFormatVersion = "1.0" + +val classNameV1_0 = "org.apache.spark.mllib.clustering.DistributedLDAModel" + +// Store the weight of each topic separately in a row. +case class Data(globalTopicTotals: Vector) + +// Store each term and document vertex with an id and the topicWeights. +case class VertexData(id: Long, topicWeights: Vector) + +// Store each edge with the source id, destination id and tokenCounts. +case class EdgeData(srcId: Long, dstId: Long, tokenCounts: Double) + +def save( +sc: SparkContext, +path: String, +graph: Graph[LDA.TopicCounts, LDA.TokenCount], +globalTopicTotals: LDA.TopicCounts, +k: Int, +vocabSize: Int, +docConcentration: Double, +topicConcentration: Double, +iterationTimes: Array[Double]): Unit = { + + val sqlContext = SQLContext.getOrCreate(sc) + import sqlContext.implicits._ + + val metadata = compact(render +(("class" -> classNameV1_0) ~ ("version" -> thisFormatVersion) ~ + ("k" -> k) ~ ("vocabSize" -> vocabSize) ~ ("docConcentration" -> docConcentration) ~ + ("topicConcentration" -> topicConcentration) ~ + ("iterationTimes" -> iterationTimes.toSeq))) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + + val newPath = new Path(Loader.dataPath(path), "globalTopicTotals").toUri.toString + sc.parallelize(Seq(Data(Vectors.fromBreeze(globalTopicTotals.toDF() +.write.parquet(newPath) + + val verticesPath = new Path(Loader.dataPath(path), "topicCounts").toUri.toString + graph.vertices.map { case (ind, vertex) => +VertexData(ind, Vectors.fromBreeze(vertex)) + }.toDF().write.parquet(verticesPath) + + val edgesPath = new Path(Loader.dataPath(path), "tokenCounts").toUri.toString + graph.edges.map { case Edge(srcId, dstId, prop) => +EdgeData(srcId, dstId, prop) + }.toDF().write.parquet(edgesPath) +} + +def load( +sc: SparkContext, +path: String, +vocabSize: Int, +docConcentration: Double, +topicConcentration: Double, +iterationTimes: Array[Double]): DistributedLDAModel = { + + val dataPath = new Path(Loader.dataPath(path), "globalTopicTotals").toUri.toString + val vertexDataPath = new Path(Loader.dataPath(path), "topicCounts").toUri.toString + val edgeDataPath = new Path(Loader.dataPath(path), "tokenCounts").toUri.toString + val sqlContext = new SQLContext(sc) + val dataFrame = sqlContext.read.parquet(dataPath) + val vertexDataFrame = sqlContext.read.parquet(vertexDataPath) + val edgeDataFrame = sqlContext.read.parquet(edgeDataPath) + + Loader.checkSchema[Data](dataFrame.schema) + Loader.checkSchema[VertexData](vertexDataFrame.schema) + Loader.checkSchema[EdgeData](edgeDataFrame.schema) + val globalTopicTotals: LDA.TopicCounts = +dataFrame.first().getAs[Vector](0).toBreeze.toDenseVector + val vertices: RDD[(VertexId, LDA.TopicCounts)] = vertexDataFrame.map { +case Row(ind: Long, vec: Vector) => (ind, vec.toBreeze.toDenseVector) + } + + val edges: RDD[Edge[LDA.TokenCount]] = edgeDataFrame.map { +case Row(srcId: Long, dstId: Long, prop: Double) => Edge(srcId, dstId, prop) + } + val graph: Graph[LDA.TopicCounts, LDA.TokenCount] = Graph(vertices, edges) + + new DistributedLDAModel( --- End diff -- style: put arguments on this line too. We only use this style for method headers, not method calls. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if
[GitHub] spark pull request: [SPARK-5989] [MLlib] Model save/load for LDA
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/6948#discussion_r34755372 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala --- @@ -354,4 +445,140 @@ class DistributedLDAModel private ( // TODO: // override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ??? + override protected def formatVersion = "1.0" + + override def save(sc: SparkContext, path: String): Unit = { +DistributedLDAModel.SaveLoadV1_0.save( + sc, path, graph, globalTopicTotals, k, vocabSize, docConcentration, topicConcentration, + iterationTimes) + } +} + + +@Experimental +object DistributedLDAModel extends Loader[DistributedLDAModel]{ --- End diff -- need space before brace. Please check for style issues everywhere. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org