[GitHub] spark pull request: [SPARK-2075][Core] Make the compiler generate ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3740#issuecomment-67608353 [Test build #24635 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24635/consoleFull) for PR 3740 at commit [`734bac9`](https://github.com/apache/spark/commit/734bac96340f1d82584aa0bcba67d3f60e09c39d). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2075][Core] Make the compiler generate ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/3740#discussion_r22093377 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1174,6 +1174,14 @@ abstract class RDD[T: ClassTag]( * Save this RDD as a text file, using string representations of elements. */ def saveAsTextFile(path: String) { +// https://issues.apache.org/jira/browse/SPARK-2075 +// NullWritable is a Comparable rather than Comparable[NullWritable] in Hadoop 1.+, +// so the compiler cannot find an implicit Ordering for it. It will generate different +// anonymous classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. Therefore, here we +// provide an Ordering for NullWritable so that the compiler will generate same codes. +implicit val nullWritableOrdering = new Ordering[NullWritable] { + override def compare(x: NullWritable, y: NullWritable): Int = 0 +} this.map(x => (NullWritable.get(), new Text(x.toString))) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) --- End diff -- Right. Explicit solution is better for such tricky issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add role and checkpoint support for Mesos back...
Github user tnachen commented on the pull request: https://github.com/apache/spark/pull/60#issuecomment-67607923 Seems like the original author is no longer going to work on this, I'll take this up again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add role and checkpoint support for Mesos back...
Github user tdna commented on the pull request: https://github.com/apache/spark/pull/60#issuecomment-67607842 If somebody uses mesos with more framework it is a very important feature (like us), please somebody file an issue on the Jira. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4094][CORE] checkpoint should still be ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2956#issuecomment-67607694 [Test build #24629 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24629/consoleFull) for PR 2956 at commit [`be7c1fa`](https://github.com/apache/spark/commit/be7c1fae8deb2922b276fca2c46f747c2cdb05f1). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4094][CORE] checkpoint should still be ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2956#issuecomment-67607704 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24629/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4883][Shuffle] Add a name to the direct...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3734#issuecomment-67607387 [Test build #24634 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24634/consoleFull) for PR 3734 at commit [`cc74727`](https://github.com/apache/spark/commit/cc747279b74c156ad30d549d71a1f4508b1beebd). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4749] [mllib]: Allow initializing KMean...
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/3610#issuecomment-67607352 LGTM except minor inline comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4883][Shuffle] Add a name to the direct...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/3734#discussion_r22092977 --- Diff: network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java --- @@ -60,8 +61,11 @@ private final TransportConf conf; public ExternalShuffleBlockManager(TransportConf conf) { -// TODO: Give this thread a name. -this(conf, Executors.newSingleThreadExecutor()); +this(conf, Executors.newSingleThreadExecutor( --- End diff -- Agree. Use `NettyUtils.createThreadFactor` now --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092935 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureModelEM private ( +private var k: Int, +private var convergenceTol: Double, +private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] + + private type ExpectationSum = ( +Array[Double], // log-likelihood in index 0 +Array[Double], // array of weights +Array[DenseDoubleVector], // array of means +Array[DenseDoubleMatrix]) // array of cov matrices + + // create a zero'd ExpectationSum instance + private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = { +(Array(0.0), + new Array[Double](k), + (0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray, --- End diff -- `Array.fill(k)(BDV.zeros[Double](d))` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092959 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureModelEM private ( +private var k: Int, +private var convergenceTol: Double, +private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] + + private type ExpectationSum = ( +Array[Double], // log-likelihood in index 0 +Array[Double], // array of weights +Array[DenseDoubleVector], // array of means +Array[DenseDoubleMatrix]) // array of cov matrices + + // create a zero'd ExpectationSum instance + private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = { +(Array(0.0), + new Array[Double](k), + (0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray, + (0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray) + } + + // add two ExpectationSum objects (allowed to use modify m1) + // (U, U) => U for aggregation + private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = { +m1._1(0) += m2._1(0) +var i = 0 +while (i < m1._2.length) { + m1._2(i) += m2._2(i) + m1._3(i) += m2._3(i) + m1._4(i) += m2._4(i) + i = i + 1 +} +m1 + } + + // compute cluster contributions for each input point + // (U, T) => U for aggregation + private def computeExpectation( + weights: Array[Double], + dists: Array[MultivariateGaussian]) + (sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { +val k = sums._2.length +val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } +val pSum = p.sum +sums._1(0) += math.log(pSum) +val xxt = x * new Transpose(x) +var i = 0 +while (i < k) { + p(i) /= pSum + sums._2(i) += p(i) + sums._3(i) += x * p(i) + sums._4(i) += xxt * p(i) + i = i + 1 +} +sums + } + + // number of samples per cluster to use when initializing Gaussians + private val nSamples = 5 + + // an initializing GMM can be provided rather than using the
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092957 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureModelEM private ( +private var k: Int, +private var convergenceTol: Double, +private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] + + private type ExpectationSum = ( +Array[Double], // log-likelihood in index 0 +Array[Double], // array of weights +Array[DenseDoubleVector], // array of means +Array[DenseDoubleMatrix]) // array of cov matrices + + // create a zero'd ExpectationSum instance + private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = { +(Array(0.0), + new Array[Double](k), + (0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray, + (0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray) + } + + // add two ExpectationSum objects (allowed to use modify m1) + // (U, U) => U for aggregation + private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = { +m1._1(0) += m2._1(0) +var i = 0 +while (i < m1._2.length) { + m1._2(i) += m2._2(i) + m1._3(i) += m2._3(i) + m1._4(i) += m2._4(i) + i = i + 1 +} +m1 + } + + // compute cluster contributions for each input point + // (U, T) => U for aggregation + private def computeExpectation( + weights: Array[Double], + dists: Array[MultivariateGaussian]) + (sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { +val k = sums._2.length +val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } +val pSum = p.sum +sums._1(0) += math.log(pSum) +val xxt = x * new Transpose(x) +var i = 0 +while (i < k) { + p(i) /= pSum + sums._2(i) += p(i) + sums._3(i) += x * p(i) + sums._4(i) += xxt * p(i) + i = i + 1 +} +sums + } + + // number of samples per cluster to use when initializing Gaussians + private val nSamples = 5 + + // an initializing GMM can be provided rather than using the
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092963 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.impl + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.{Transpose, det, pinv} + +/** + * Utility class to implement the density function for multivariate Gaussian distribution. + * Breeze provides this functionality, but it requires the Apache Commons Math library, + * so this class is here so-as to not introduce a new dependency in Spark. + */ +private[mllib] class MultivariateGaussian( +val mu: BreezeVector[Double], +val sigma: BreezeMatrix[Double]) extends Serializable { + private val sigmaInv2 = pinv(sigma) * -0.5 + private val U = math.pow(2.0 * math.Pi, -mu.length / 2.0) * math.pow(det(sigma), -0.5) + + def pdf(x: BreezeVector[Double]): Double = { --- End diff -- Need doc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092960 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureModelEM private ( +private var k: Int, +private var convergenceTol: Double, +private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] + + private type ExpectationSum = ( +Array[Double], // log-likelihood in index 0 +Array[Double], // array of weights +Array[DenseDoubleVector], // array of means +Array[DenseDoubleMatrix]) // array of cov matrices + + // create a zero'd ExpectationSum instance + private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = { +(Array(0.0), + new Array[Double](k), + (0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray, + (0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray) + } + + // add two ExpectationSum objects (allowed to use modify m1) + // (U, U) => U for aggregation + private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = { +m1._1(0) += m2._1(0) +var i = 0 +while (i < m1._2.length) { + m1._2(i) += m2._2(i) + m1._3(i) += m2._3(i) + m1._4(i) += m2._4(i) + i = i + 1 +} +m1 + } + + // compute cluster contributions for each input point + // (U, T) => U for aggregation + private def computeExpectation( + weights: Array[Double], + dists: Array[MultivariateGaussian]) + (sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { +val k = sums._2.length +val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } +val pSum = p.sum +sums._1(0) += math.log(pSum) +val xxt = x * new Transpose(x) +var i = 0 +while (i < k) { + p(i) /= pSum + sums._2(i) += p(i) + sums._3(i) += x * p(i) + sums._4(i) += xxt * p(i) + i = i + 1 +} +sums + } + + // number of samples per cluster to use when initializing Gaussians + private val nSamples = 5 + + // an initializing GMM can be provided rather than using the
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092939 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureModelEM private ( +private var k: Int, +private var convergenceTol: Double, +private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] + + private type ExpectationSum = ( +Array[Double], // log-likelihood in index 0 +Array[Double], // array of weights +Array[DenseDoubleVector], // array of means +Array[DenseDoubleMatrix]) // array of cov matrices + + // create a zero'd ExpectationSum instance + private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = { +(Array(0.0), + new Array[Double](k), + (0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray, + (0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray) + } + + // add two ExpectationSum objects (allowed to use modify m1) + // (U, U) => U for aggregation + private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = { +m1._1(0) += m2._1(0) +var i = 0 +while (i < m1._2.length) { + m1._2(i) += m2._2(i) + m1._3(i) += m2._3(i) + m1._4(i) += m2._4(i) + i = i + 1 +} +m1 + } + + // compute cluster contributions for each input point + // (U, T) => U for aggregation + private def computeExpectation( + weights: Array[Double], + dists: Array[MultivariateGaussian]) + (sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { +val k = sums._2.length +val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } +val pSum = p.sum +sums._1(0) += math.log(pSum) +val xxt = x * new Transpose(x) +var i = 0 +while (i < k) { + p(i) /= pSum + sums._2(i) += p(i) + sums._3(i) += x * p(i) + sums._4(i) += xxt * p(i) + i = i + 1 +} +sums + } + + // number of samples per cluster to use when initializing Gaussians + private val nSamples = 5 + + // an initializing GMM can be provided rather than using the
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092937 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureModelEM private ( +private var k: Int, +private var convergenceTol: Double, +private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] + + private type ExpectationSum = ( +Array[Double], // log-likelihood in index 0 +Array[Double], // array of weights +Array[DenseDoubleVector], // array of means +Array[DenseDoubleMatrix]) // array of cov matrices + + // create a zero'd ExpectationSum instance + private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = { +(Array(0.0), + new Array[Double](k), + (0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray, + (0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray) + } + + // add two ExpectationSum objects (allowed to use modify m1) + // (U, U) => U for aggregation + private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = { --- End diff -- See my previous comments about `ExpectationSum`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092942 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureModelEM private ( +private var k: Int, +private var convergenceTol: Double, +private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] + + private type ExpectationSum = ( +Array[Double], // log-likelihood in index 0 +Array[Double], // array of weights +Array[DenseDoubleVector], // array of means +Array[DenseDoubleMatrix]) // array of cov matrices + + // create a zero'd ExpectationSum instance + private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = { +(Array(0.0), + new Array[Double](k), + (0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray, + (0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray) + } + + // add two ExpectationSum objects (allowed to use modify m1) + // (U, U) => U for aggregation + private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = { +m1._1(0) += m2._1(0) +var i = 0 +while (i < m1._2.length) { + m1._2(i) += m2._2(i) + m1._3(i) += m2._3(i) + m1._4(i) += m2._4(i) + i = i + 1 +} +m1 + } + + // compute cluster contributions for each input point + // (U, T) => U for aggregation + private def computeExpectation( + weights: Array[Double], + dists: Array[MultivariateGaussian]) + (sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { +val k = sums._2.length +val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } +val pSum = p.sum +sums._1(0) += math.log(pSum) +val xxt = x * new Transpose(x) +var i = 0 +while (i < k) { + p(i) /= pSum + sums._2(i) += p(i) + sums._3(i) += x * p(i) + sums._4(i) += xxt * p(i) + i = i + 1 +} +sums + } + + // number of samples per cluster to use when initializing Gaussians + private val nSamples = 5 + + // an initializing GMM can be provided rather than using the
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092953 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureModelEM private ( +private var k: Int, +private var convergenceTol: Double, +private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] + + private type ExpectationSum = ( +Array[Double], // log-likelihood in index 0 +Array[Double], // array of weights +Array[DenseDoubleVector], // array of means +Array[DenseDoubleMatrix]) // array of cov matrices + + // create a zero'd ExpectationSum instance + private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = { +(Array(0.0), + new Array[Double](k), + (0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray, + (0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray) + } + + // add two ExpectationSum objects (allowed to use modify m1) + // (U, U) => U for aggregation + private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = { +m1._1(0) += m2._1(0) +var i = 0 +while (i < m1._2.length) { + m1._2(i) += m2._2(i) + m1._3(i) += m2._3(i) + m1._4(i) += m2._4(i) + i = i + 1 +} +m1 + } + + // compute cluster contributions for each input point + // (U, T) => U for aggregation + private def computeExpectation( + weights: Array[Double], + dists: Array[MultivariateGaussian]) + (sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { +val k = sums._2.length +val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } +val pSum = p.sum +sums._1(0) += math.log(pSum) +val xxt = x * new Transpose(x) +var i = 0 +while (i < k) { + p(i) /= pSum + sums._2(i) += p(i) + sums._3(i) += x * p(i) + sums._4(i) += xxt * p(i) + i = i + 1 +} +sums + } + + // number of samples per cluster to use when initializing Gaussians + private val nSamples = 5 + + // an initializing GMM can be provided rather than using the
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092956 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureModelEM private ( +private var k: Int, +private var convergenceTol: Double, +private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] + + private type ExpectationSum = ( +Array[Double], // log-likelihood in index 0 +Array[Double], // array of weights +Array[DenseDoubleVector], // array of means +Array[DenseDoubleMatrix]) // array of cov matrices + + // create a zero'd ExpectationSum instance + private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = { +(Array(0.0), + new Array[Double](k), + (0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray, + (0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray) + } + + // add two ExpectationSum objects (allowed to use modify m1) + // (U, U) => U for aggregation + private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = { +m1._1(0) += m2._1(0) +var i = 0 +while (i < m1._2.length) { + m1._2(i) += m2._2(i) + m1._3(i) += m2._3(i) + m1._4(i) += m2._4(i) + i = i + 1 +} +m1 + } + + // compute cluster contributions for each input point + // (U, T) => U for aggregation + private def computeExpectation( + weights: Array[Double], + dists: Array[MultivariateGaussian]) + (sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { +val k = sums._2.length +val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } +val pSum = p.sum +sums._1(0) += math.log(pSum) +val xxt = x * new Transpose(x) +var i = 0 +while (i < k) { + p(i) /= pSum + sums._2(i) += p(i) + sums._3(i) += x * p(i) + sums._4(i) += xxt * p(i) + i = i + 1 +} +sums + } + + // number of samples per cluster to use when initializing Gaussians + private val nSamples = 5 + + // an initializing GMM can be provided rather than using the
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092938 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureModelEM private ( +private var k: Int, +private var convergenceTol: Double, +private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] + + private type ExpectationSum = ( +Array[Double], // log-likelihood in index 0 +Array[Double], // array of weights +Array[DenseDoubleVector], // array of means +Array[DenseDoubleMatrix]) // array of cov matrices + + // create a zero'd ExpectationSum instance + private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = { +(Array(0.0), + new Array[Double](k), + (0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray, + (0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray) + } + + // add two ExpectationSum objects (allowed to use modify m1) + // (U, U) => U for aggregation + private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = { +m1._1(0) += m2._1(0) +var i = 0 +while (i < m1._2.length) { + m1._2(i) += m2._2(i) + m1._3(i) += m2._3(i) + m1._4(i) += m2._4(i) + i = i + 1 +} +m1 + } + + // compute cluster contributions for each input point + // (U, T) => U for aggregation + private def computeExpectation( + weights: Array[Double], + dists: Array[MultivariateGaussian]) + (sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { +val k = sums._2.length +val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } +val pSum = p.sum +sums._1(0) += math.log(pSum) +val xxt = x * new Transpose(x) +var i = 0 +while (i < k) { + p(i) /= pSum + sums._2(i) += p(i) + sums._3(i) += x * p(i) + sums._4(i) += xxt * p(i) --- End diff -- minor: The implementation in this block allocates unnecessary temp memory. For example, this is a rank-1 update. Computing `xxt` allocates unnecessary memory. We can use `BLAS.dsyr` instead. Another optimization we can
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092962 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.impl + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.{Transpose, det, pinv} --- End diff -- Group the imports into a single closure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092955 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureModelEM private ( +private var k: Int, +private var convergenceTol: Double, +private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] + + private type ExpectationSum = ( +Array[Double], // log-likelihood in index 0 +Array[Double], // array of weights +Array[DenseDoubleVector], // array of means +Array[DenseDoubleMatrix]) // array of cov matrices + + // create a zero'd ExpectationSum instance + private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = { +(Array(0.0), + new Array[Double](k), + (0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray, + (0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray) + } + + // add two ExpectationSum objects (allowed to use modify m1) + // (U, U) => U for aggregation + private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = { +m1._1(0) += m2._1(0) +var i = 0 +while (i < m1._2.length) { + m1._2(i) += m2._2(i) + m1._3(i) += m2._3(i) + m1._4(i) += m2._4(i) + i = i + 1 +} +m1 + } + + // compute cluster contributions for each input point + // (U, T) => U for aggregation + private def computeExpectation( + weights: Array[Double], + dists: Array[MultivariateGaussian]) + (sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { +val k = sums._2.length +val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } +val pSum = p.sum +sums._1(0) += math.log(pSum) +val xxt = x * new Transpose(x) +var i = 0 +while (i < k) { + p(i) /= pSum + sums._2(i) += p(i) + sums._3(i) += x * p(i) + sums._4(i) += xxt * p(i) + i = i + 1 +} +sums + } + + // number of samples per cluster to use when initializing Gaussians + private val nSamples = 5 + + // an initializing GMM can be provided rather than using the
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092934 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureModelEM private ( +private var k: Int, +private var convergenceTol: Double, +private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] + + private type ExpectationSum = ( --- End diff -- The code could be more readable if we define `ExpectionSum` as a private class with `var loglik`, `weights`, `mean`, and `cov`, then implement `add` method. As a result, we don't use `._1`, `._2`, in the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092952 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureModelEM private ( +private var k: Int, +private var convergenceTol: Double, +private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] + + private type ExpectationSum = ( +Array[Double], // log-likelihood in index 0 +Array[Double], // array of weights +Array[DenseDoubleVector], // array of means +Array[DenseDoubleMatrix]) // array of cov matrices + + // create a zero'd ExpectationSum instance + private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = { +(Array(0.0), + new Array[Double](k), + (0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray, + (0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray) + } + + // add two ExpectationSum objects (allowed to use modify m1) + // (U, U) => U for aggregation + private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = { +m1._1(0) += m2._1(0) +var i = 0 +while (i < m1._2.length) { + m1._2(i) += m2._2(i) + m1._3(i) += m2._3(i) + m1._4(i) += m2._4(i) + i = i + 1 +} +m1 + } + + // compute cluster contributions for each input point + // (U, T) => U for aggregation + private def computeExpectation( + weights: Array[Double], + dists: Array[MultivariateGaussian]) + (sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { +val k = sums._2.length +val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } +val pSum = p.sum +sums._1(0) += math.log(pSum) +val xxt = x * new Transpose(x) +var i = 0 +while (i < k) { + p(i) /= pSum + sums._2(i) += p(i) + sums._3(i) += x * p(i) + sums._4(i) += xxt * p(i) + i = i + 1 +} +sums + } + + // number of samples per cluster to use when initializing Gaussians + private val nSamples = 5 + + // an initializing GMM can be provided rather than using the
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092954 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureModelEM private ( +private var k: Int, +private var convergenceTol: Double, +private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] + + private type ExpectationSum = ( +Array[Double], // log-likelihood in index 0 +Array[Double], // array of weights +Array[DenseDoubleVector], // array of means +Array[DenseDoubleMatrix]) // array of cov matrices + + // create a zero'd ExpectationSum instance + private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = { +(Array(0.0), + new Array[Double](k), + (0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray, + (0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray) + } + + // add two ExpectationSum objects (allowed to use modify m1) + // (U, U) => U for aggregation + private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = { +m1._1(0) += m2._1(0) +var i = 0 +while (i < m1._2.length) { + m1._2(i) += m2._2(i) + m1._3(i) += m2._3(i) + m1._4(i) += m2._4(i) + i = i + 1 +} +m1 + } + + // compute cluster contributions for each input point + // (U, T) => U for aggregation + private def computeExpectation( + weights: Array[Double], + dists: Array[MultivariateGaussian]) + (sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { +val k = sums._2.length +val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } +val pSum = p.sum +sums._1(0) += math.log(pSum) +val xxt = x * new Transpose(x) +var i = 0 +while (i < k) { + p(i) /= pSum + sums._2(i) += p(i) + sums._3(i) += x * p(i) + sums._4(i) += xxt * p(i) + i = i + 1 +} +sums + } + + // number of samples per cluster to use when initializing Gaussians + private val nSamples = 5 + + // an initializing GMM can be provided rather than using the
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092927 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureModelEM private ( +private var k: Int, +private var convergenceTol: Double, +private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] --- End diff -- Does this aliases simplify any code? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Small refactoring to pass SparkEnv into Execut...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3738#issuecomment-67607055 [Test build #24633 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24633/consoleFull) for PR 3738 at commit [`82e02cc`](https://github.com/apache/spark/commit/82e02cce503bba56b728fd0eeab6baa2a8b91ed6). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092924 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView --- End diff -- organize imports into groups: https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092941 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureModelEM private ( +private var k: Int, +private var convergenceTol: Double, +private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] + + private type ExpectationSum = ( +Array[Double], // log-likelihood in index 0 +Array[Double], // array of weights +Array[DenseDoubleVector], // array of means +Array[DenseDoubleMatrix]) // array of cov matrices + + // create a zero'd ExpectationSum instance + private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = { +(Array(0.0), + new Array[Double](k), + (0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray, + (0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray) + } + + // add two ExpectationSum objects (allowed to use modify m1) + // (U, U) => U for aggregation + private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = { +m1._1(0) += m2._1(0) +var i = 0 +while (i < m1._2.length) { + m1._2(i) += m2._2(i) + m1._3(i) += m2._3(i) + m1._4(i) += m2._4(i) + i = i + 1 +} +m1 + } + + // compute cluster contributions for each input point + // (U, T) => U for aggregation + private def computeExpectation( + weights: Array[Double], + dists: Array[MultivariateGaussian]) + (sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { +val k = sums._2.length +val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } +val pSum = p.sum +sums._1(0) += math.log(pSum) +val xxt = x * new Transpose(x) +var i = 0 +while (i < k) { + p(i) /= pSum + sums._2(i) += p(i) + sums._3(i) += x * p(i) + sums._4(i) += xxt * p(i) + i = i + 1 +} +sums + } + + // number of samples per cluster to use when initializing Gaussians + private val nSamples = 5 + + // an initializing GMM can be provided rather than using the
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092918 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector} + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.Matrix +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +/** + * Multivariate Gaussian Mixture Model (GMM) consisting of k Gaussians, where points + * are drawn from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are + * the respective mean and covariance for each Gaussian distribution i=1..k. + * + * @param weight Weights for each Gaussian distribution in the mixture, where mu(i) is + * the weight for Gaussian i, and weight.sum == 1 + * @param mu Means for each Gaussian in the mixture, where mu(i) is the mean for Gaussian i + * @param sigma Covariance maxtrix for each Gaussian in the mixture, where sigma(i) is the + * covariance matrix for Gaussian i + */ +class GaussianMixtureModel( + val weight: Array[Double], + val mu: Array[Vector], + val sigma: Array[Matrix]) extends Serializable { + + /** Number of gaussians in mixture */ + def k: Int = weight.length + + /** Maps given points to their cluster indices. */ + def predict(points: RDD[Vector]): (RDD[Array[Double]],RDD[Int]) = { +val responsibilityMatrix = predictMembership(points,mu,sigma,weight,k) --- End diff -- space after each `,` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092908 --- Diff: examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.mllib.clustering.GaussianMixtureModelEM +import org.apache.spark.mllib.linalg.Vectors + +/** + * An example Gaussian Mixture Model EM app. Run with + * {{{ + * ./bin/run-example org.apache.spark.examples.mllib.DenseGmmEM + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object DenseGmmEM { + def main(args: Array[String]): Unit = { +if (args.length != 3) { + println("usage: DenseGmmEM ") +} else { + run(args(0), args(1).toInt, args(2).toDouble) +} + } + + private def run(inputFile: String, k: Int, convergenceTol: Double) { +val conf = new SparkConf().setAppName("Spark EM Sample") --- End diff -- Gaussian Mixture Model EM example. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092933 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureModelEM private ( +private var k: Int, +private var convergenceTol: Double, +private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] --- End diff -- `IndexedSeq` should be sufficient. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092923 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector} + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.Matrix +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +/** + * Multivariate Gaussian Mixture Model (GMM) consisting of k Gaussians, where points + * are drawn from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are + * the respective mean and covariance for each Gaussian distribution i=1..k. + * + * @param weight Weights for each Gaussian distribution in the mixture, where mu(i) is + * the weight for Gaussian i, and weight.sum == 1 + * @param mu Means for each Gaussian in the mixture, where mu(i) is the mean for Gaussian i + * @param sigma Covariance maxtrix for each Gaussian in the mixture, where sigma(i) is the + * covariance matrix for Gaussian i + */ +class GaussianMixtureModel( + val weight: Array[Double], + val mu: Array[Vector], + val sigma: Array[Matrix]) extends Serializable { + + /** Number of gaussians in mixture */ + def k: Int = weight.length + + /** Maps given points to their cluster indices. */ + def predict(points: RDD[Vector]): (RDD[Array[Double]],RDD[Int]) = { +val responsibilityMatrix = predictMembership(points,mu,sigma,weight,k) +val clusterLabels = responsibilityMatrix.map(r => r.indexOf(r.max)) +(responsibilityMatrix, clusterLabels) + } + + /** + * Given the input vectors, return the membership value of each vector + * to all mixture components. + */ + def predictMembership( + points: RDD[Vector], + mu: Array[Vector], + sigma: Array[Matrix], + weight: Array[Double], k: Int): RDD[Array[Double]] = { +val sc = points.sparkContext +val dists = sc.broadcast{ + (0 until k).map{ i => +new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix) + }.toArray +} +val weights = sc.broadcast(weight) +points.map{ x => + computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k) +} + } + + // We use "eps" as the minimum likelihood density for any given point + // in every cluster; this prevents any divide by zero conditions for + // outlier points. + private val eps = math.pow(2.0, -52) --- End diff -- EPS is defined in `MLUtils.EPSILON`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092920 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector} + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.Matrix +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +/** + * Multivariate Gaussian Mixture Model (GMM) consisting of k Gaussians, where points + * are drawn from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are + * the respective mean and covariance for each Gaussian distribution i=1..k. + * + * @param weight Weights for each Gaussian distribution in the mixture, where mu(i) is + * the weight for Gaussian i, and weight.sum == 1 + * @param mu Means for each Gaussian in the mixture, where mu(i) is the mean for Gaussian i + * @param sigma Covariance maxtrix for each Gaussian in the mixture, where sigma(i) is the + * covariance matrix for Gaussian i + */ +class GaussianMixtureModel( + val weight: Array[Double], + val mu: Array[Vector], + val sigma: Array[Matrix]) extends Serializable { + + /** Number of gaussians in mixture */ + def k: Int = weight.length + + /** Maps given points to their cluster indices. */ + def predict(points: RDD[Vector]): (RDD[Array[Double]],RDD[Int]) = { +val responsibilityMatrix = predictMembership(points,mu,sigma,weight,k) +val clusterLabels = responsibilityMatrix.map(r => r.indexOf(r.max)) +(responsibilityMatrix, clusterLabels) + } + + /** + * Given the input vectors, return the membership value of each vector + * to all mixture components. + */ + def predictMembership( + points: RDD[Vector], + mu: Array[Vector], + sigma: Array[Matrix], + weight: Array[Double], k: Int): RDD[Array[Double]] = { --- End diff -- move `k: Int) ...` to a new line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092926 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.Transpose + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +import scala.collection.mutable.IndexedSeqView + +/** + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform + */ +class GaussianMixtureModelEM private ( --- End diff -- minor: GMM's name contains `Model`, which is a little confusing: `GaussianMixtureModelEM` produces `GaussianMixtureModel`. I don't have good suggestions. Maybe we could rename `GaussianMixtureModelEM` to `GaussianMixtureEM`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092915 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.Matrix +import org.apache.spark.mllib.linalg.Vector + +/** + * Multivariate Gaussian Mixture Model (GMM) consisting of k Gaussians, where points + * are drawn from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are + * the respective mean and covariance for each Gaussian distribution i=1..k. + * + * @param weight Weights for each Gaussian distribution in the mixture, where mu(i) is + * the weight for Gaussian i, and weight.sum == 1 + * @param mu Means for each Gaussian in the mixture, where mu(i) is the mean for Gaussian i + * @param sigma Covariance maxtrix for each Gaussian in the mixture, where sigma(i) is the + * covariance matrix for Gaussian i + */ +class GaussianMixtureModel( + val weight: Array[Double], --- End diff -- +1 on @jkbradley 's suggestion, which we can do in a follow-up PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092921 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector} + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.Matrix +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +/** + * Multivariate Gaussian Mixture Model (GMM) consisting of k Gaussians, where points + * are drawn from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are + * the respective mean and covariance for each Gaussian distribution i=1..k. + * + * @param weight Weights for each Gaussian distribution in the mixture, where mu(i) is + * the weight for Gaussian i, and weight.sum == 1 + * @param mu Means for each Gaussian in the mixture, where mu(i) is the mean for Gaussian i + * @param sigma Covariance maxtrix for each Gaussian in the mixture, where sigma(i) is the + * covariance matrix for Gaussian i + */ +class GaussianMixtureModel( + val weight: Array[Double], + val mu: Array[Vector], + val sigma: Array[Matrix]) extends Serializable { + + /** Number of gaussians in mixture */ + def k: Int = weight.length + + /** Maps given points to their cluster indices. */ + def predict(points: RDD[Vector]): (RDD[Array[Double]],RDD[Int]) = { +val responsibilityMatrix = predictMembership(points,mu,sigma,weight,k) +val clusterLabels = responsibilityMatrix.map(r => r.indexOf(r.max)) +(responsibilityMatrix, clusterLabels) + } + + /** + * Given the input vectors, return the membership value of each vector + * to all mixture components. + */ + def predictMembership( + points: RDD[Vector], + mu: Array[Vector], + sigma: Array[Matrix], + weight: Array[Double], k: Int): RDD[Array[Double]] = { +val sc = points.sparkContext +val dists = sc.broadcast{ --- End diff -- The downside is every time `predictMembershiip` is called we need to re-broadcast `mu`, `sigma`, and `weights`. Because we already broadcast the RDD closure since 1.1, it is not necessary to do this unless we could reuse the broadcast objects. Btw, if we add a method to `GaussianMixtureModel` that can predict individual instance, this could be done by broadcast the entire model, which is about the same performance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092909 --- Diff: examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.mllib.clustering.GaussianMixtureModelEM +import org.apache.spark.mllib.linalg.Vectors + +/** + * An example Gaussian Mixture Model EM app. Run with + * {{{ + * ./bin/run-example org.apache.spark.examples.mllib.DenseGmmEM + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object DenseGmmEM { + def main(args: Array[String]): Unit = { +if (args.length != 3) { + println("usage: DenseGmmEM ") +} else { + run(args(0), args(1).toInt, args(2).toDouble) +} + } + + private def run(inputFile: String, k: Int, convergenceTol: Double) { +val conf = new SparkConf().setAppName("Spark EM Sample") +val ctx = new SparkContext(conf) + +val data = ctx.textFile(inputFile).map{ line => --- End diff -- space before `{` ( please also update others) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092917 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector} + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.Matrix +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +/** + * Multivariate Gaussian Mixture Model (GMM) consisting of k Gaussians, where points + * are drawn from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are + * the respective mean and covariance for each Gaussian distribution i=1..k. + * + * @param weight Weights for each Gaussian distribution in the mixture, where mu(i) is + * the weight for Gaussian i, and weight.sum == 1 + * @param mu Means for each Gaussian in the mixture, where mu(i) is the mean for Gaussian i + * @param sigma Covariance maxtrix for each Gaussian in the mixture, where sigma(i) is the + * covariance matrix for Gaussian i + */ +class GaussianMixtureModel( + val weight: Array[Double], + val mu: Array[Vector], + val sigma: Array[Matrix]) extends Serializable { + + /** Number of gaussians in mixture */ + def k: Int = weight.length + + /** Maps given points to their cluster indices. */ + def predict(points: RDD[Vector]): (RDD[Array[Double]],RDD[Int]) = { --- End diff -- space after `,` (please also update others) Is it simpler if we only return `RDD[Array[Double]]` and let users compute the best cluster? Another solution is to let `predict` return `RDD[Int]` and add `predictRaw` return `RDD[Array[Double]]`. Btw, this issue could be easily addressed by the new pipeline API, where we can put two columns and compute them on-demand. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4156 [MLLIB] EM algorithm for GMMs
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/3022#discussion_r22092919 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector} + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.Matrix +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.stat.impl.MultivariateGaussian + +/** + * Multivariate Gaussian Mixture Model (GMM) consisting of k Gaussians, where points + * are drawn from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are + * the respective mean and covariance for each Gaussian distribution i=1..k. + * + * @param weight Weights for each Gaussian distribution in the mixture, where mu(i) is + * the weight for Gaussian i, and weight.sum == 1 + * @param mu Means for each Gaussian in the mixture, where mu(i) is the mean for Gaussian i + * @param sigma Covariance maxtrix for each Gaussian in the mixture, where sigma(i) is the + * covariance matrix for Gaussian i + */ +class GaussianMixtureModel( + val weight: Array[Double], + val mu: Array[Vector], + val sigma: Array[Matrix]) extends Serializable { + + /** Number of gaussians in mixture */ + def k: Int = weight.length + + /** Maps given points to their cluster indices. */ + def predict(points: RDD[Vector]): (RDD[Array[Double]],RDD[Int]) = { +val responsibilityMatrix = predictMembership(points,mu,sigma,weight,k) +val clusterLabels = responsibilityMatrix.map(r => r.indexOf(r.max)) +(responsibilityMatrix, clusterLabels) + } + + /** + * Given the input vectors, return the membership value of each vector + * to all mixture components. + */ + def predictMembership( --- End diff -- Should it be a private method inside `object GaussianMixtureModel`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2075][Core] Make the compiler generate ...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/3740#discussion_r22092697 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1174,6 +1174,14 @@ abstract class RDD[T: ClassTag]( * Save this RDD as a text file, using string representations of elements. */ def saveAsTextFile(path: String) { +// https://issues.apache.org/jira/browse/SPARK-2075 +// NullWritable is a Comparable rather than Comparable[NullWritable] in Hadoop 1.+, +// so the compiler cannot find an implicit Ordering for it. It will generate different +// anonymous classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. Therefore, here we +// provide an Ordering for NullWritable so that the compiler will generate same codes. +implicit val nullWritableOrdering = new Ordering[NullWritable] { + override def compare(x: NullWritable, y: NullWritable): Int = 0 +} this.map(x => (NullWritable.get(), new Text(x.toString))) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) --- End diff -- Is the problem here is that while compiling Hadoop 2, the compiler chooses to specify the Ordering on the implicit rddToPairRDDFunctions, while in Hadoop 1 it instead uses the default method (`return null`) to invoke the implicit? I wonder if a more explicit solution, like the introduction of an conversion to PairRDDFunctions which takes an Ordering, is warranted for these cases. e.g.: ```scala this.map(x => (NullWritable.get(), new Text(x.toString))) .toPairRDD(nullWritableOrdering) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) ``` This would be less magical in why the definition of an implicit Ordering changes bytecode. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4883][Shuffle] Add a name to the direct...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/3734#discussion_r22092572 --- Diff: network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java --- @@ -60,8 +61,11 @@ private final TransportConf conf; public ExternalShuffleBlockManager(TransportConf conf) { -// TODO: Give this thread a name. -this(conf, Executors.newSingleThreadExecutor()); +this(conf, Executors.newSingleThreadExecutor( --- End diff -- Actually, this should be daemon, I think, otherwise it may prevent the JVM from exiting even when it's not doing anything. If we wanted to prevent the service from exiting before cleanup has completed, I would rather block in a stop() method until the queue of directories to delete has been drained, but I do not think this too important. Unlike Spark Executors, the graceful termination of this service should be uncorrelated with buildup of shuffle data. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1953][YARN]yarn client mode Application...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3607#issuecomment-67606014 [Test build #24628 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24628/consoleFull) for PR 3607 at commit [`2b27928`](https://github.com/apache/spark/commit/2b2792883b0b12e73fb5a1b957de75a0b53492c7). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1953][YARN]yarn client mode Application...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3607#issuecomment-67606019 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24628/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4893] Add test fixture for resetting sy...
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/3739#issuecomment-67606021 > The trickiness of this trait ordering might be an argument against this approach. Yeah. But if it's well documented, I won't be against it. Really a convenient feature. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Small refactoring to pass SparkEnv into Execut...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3738#issuecomment-67605600 [Test build #24632 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24632/consoleFull) for PR 3738 at commit [`217062a`](https://github.com/apache/spark/commit/217062a4e045b2c1c4d41c4b07fe4b7c58fabb64). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4465] runAsSparkUser doesn't affect Tas...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3741#issuecomment-67605602 [Test build #24631 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24631/consoleFull) for PR 3741 at commit [`3d6631f`](https://github.com/apache/spark/commit/3d6631f6006811fc2ffdd65a90913f96022ac34d). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4893] Add test fixture for resetting sy...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/3739#issuecomment-67605537 > Ideally, a test should not modify environment shared by other tests. Or we cannot make the tests run in parallel. We can achieve parallelism by running multiple tests in separate JVMs. This PR doesn't introduce any new forms of sharing between tests; it only attempts to fix bugs in cases where we have such sharing but don't clean up properly (e.g. by setting some system properties then forgetting to clear them or to change them back to the old values). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4893] Add test fixture for resetting sy...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/3739#issuecomment-67605441 That looks like a [legitimate test failure](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24618/testReport/). It looks like the issue there was that `ResetSystemProperties` appeared after `BeforeAndAfter`. The trickiness of this trait ordering might be an argument against this approach. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4465] runAsSparkUser doesn't affect Tas...
GitHub user jongyoul opened a pull request: https://github.com/apache/spark/pull/3741 [SPARK-4465] runAsSparkUser doesn't affect TaskRunner in Mesos environme... ...nt at all. - fixed a scope of runAsSparkUser from MesosExecutorDriver.run to MesosExecutorBackend.launchTask - See the Jira Issue for more details. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jongyoul/spark SPARK-4465 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/3741.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3741 commit 2343f13347cc7b9d5e72b4ee85bf206e21440792 Author: Jongyoul Lee Date: 2014-12-19T07:05:39Z [SPARK-4465] runAsSparkUser doesn't affect TaskRunner in Mesos environment at all. - fixed a scope of runAsSparkUser from MesosExecutorDriver.run to MesosExecutorBackend.launchTask --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4893] Add test fixture for resetting sy...
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/3739#issuecomment-67605399 Ideally, a test should not modify environment shared by other tests. Or we cannot make the tests run in parallel. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Small refactoring to pass SparkEnv into Execut...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/3738#issuecomment-67605336 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4573] [SQL] Add SettableStructObjectIns...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/3429#discussion_r22092141 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala --- @@ -119,12 +269,44 @@ private[hive] trait HiveInspectors { System.arraycopy(writable.getBytes, 0, temp, 0, temp.length) temp case poi: WritableConstantDateObjectInspector => poi.getWritableConstantValue.get() -case hvoi: HiveVarcharObjectInspector => hvoi.getPrimitiveJavaObject(data).getValue -case hdoi: HiveDecimalObjectInspector => HiveShim.toCatalystDecimal(hdoi, data) -// org.apache.hadoop.hive.serde2.io.TimestampWritable.set will reset current time object -// if next timestamp is null, so Timestamp object is cloned -case ti: TimestampObjectInspector => ti.getPrimitiveJavaObject(data).clone() -case pi: PrimitiveObjectInspector => pi.getPrimitiveJavaObject(data) +case mi: StandardConstantMapObjectInspector => + // take the value from the map inspector object, rather than the input data + mi.getWritableConstantValue.map { case (k, v) => +(unwrap(k, mi.getMapKeyObjectInspector), + unwrap(v, mi.getMapValueObjectInspector)) + }.toMap +case li: StandardConstantListObjectInspector => + // take the value from the list inspector object, rather than the input data + li.getWritableConstantValue.map(unwrap(_, li.getListElementObjectInspector)).toSeq +// if the value is null, we don't care about the object inspector type +case _ if data == null => null +case poi: VoidObjectInspector => null // always be null for void object inspector +case pi: PrimitiveObjectInspector => pi match { + // We think HiveVarchar is also a String + case hvoi: HiveVarcharObjectInspector if hvoi.preferWritable() => +hvoi.getPrimitiveWritableObject(data).getHiveVarchar.getValue + case hvoi: HiveVarcharObjectInspector => hvoi.getPrimitiveJavaObject(data).getValue + case x: StringObjectInspector if x.preferWritable() => +x.getPrimitiveWritableObject(data).toString + case x: IntObjectInspector if x.preferWritable() => x.get(data) + case x: BooleanObjectInspector if x.preferWritable() => x.get(data) + case x: FloatObjectInspector if x.preferWritable() => x.get(data) + case x: DoubleObjectInspector if x.preferWritable() => x.get(data) + case x: LongObjectInspector if x.preferWritable() => x.get(data) + case x: ShortObjectInspector if x.preferWritable() => x.get(data) + case x: ByteObjectInspector if x.preferWritable() => x.get(data) + case x: HiveDecimalObjectInspector => HiveShim.toCatalystDecimal(x, data) + case x: BinaryObjectInspector if x.preferWritable() => +x.getPrimitiveWritableObject(data).copyBytes() --- End diff -- Since this didn't break the pull request builder and it's nighttime now (so we're probably not merging tons of stuff), I'm going to hold off on reverting this for a little bit to see if we can come up with a quick hotfix. Otherwise, I'll revert this commit when I get up tomorrow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4573] [SQL] Add SettableStructObjectIns...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/3429#discussion_r22092047 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala --- @@ -119,12 +269,44 @@ private[hive] trait HiveInspectors { System.arraycopy(writable.getBytes, 0, temp, 0, temp.length) temp case poi: WritableConstantDateObjectInspector => poi.getWritableConstantValue.get() -case hvoi: HiveVarcharObjectInspector => hvoi.getPrimitiveJavaObject(data).getValue -case hdoi: HiveDecimalObjectInspector => HiveShim.toCatalystDecimal(hdoi, data) -// org.apache.hadoop.hive.serde2.io.TimestampWritable.set will reset current time object -// if next timestamp is null, so Timestamp object is cloned -case ti: TimestampObjectInspector => ti.getPrimitiveJavaObject(data).clone() -case pi: PrimitiveObjectInspector => pi.getPrimitiveJavaObject(data) +case mi: StandardConstantMapObjectInspector => + // take the value from the map inspector object, rather than the input data + mi.getWritableConstantValue.map { case (k, v) => +(unwrap(k, mi.getMapKeyObjectInspector), + unwrap(v, mi.getMapValueObjectInspector)) + }.toMap +case li: StandardConstantListObjectInspector => + // take the value from the list inspector object, rather than the input data + li.getWritableConstantValue.map(unwrap(_, li.getListElementObjectInspector)).toSeq +// if the value is null, we don't care about the object inspector type +case _ if data == null => null +case poi: VoidObjectInspector => null // always be null for void object inspector +case pi: PrimitiveObjectInspector => pi match { + // We think HiveVarchar is also a String + case hvoi: HiveVarcharObjectInspector if hvoi.preferWritable() => +hvoi.getPrimitiveWritableObject(data).getHiveVarchar.getValue + case hvoi: HiveVarcharObjectInspector => hvoi.getPrimitiveJavaObject(data).getValue + case x: StringObjectInspector if x.preferWritable() => +x.getPrimitiveWritableObject(data).toString + case x: IntObjectInspector if x.preferWritable() => x.get(data) + case x: BooleanObjectInspector if x.preferWritable() => x.get(data) + case x: FloatObjectInspector if x.preferWritable() => x.get(data) + case x: DoubleObjectInspector if x.preferWritable() => x.get(data) + case x: LongObjectInspector if x.preferWritable() => x.get(data) + case x: ShortObjectInspector if x.preferWritable() => x.get(data) + case x: ByteObjectInspector if x.preferWritable() => x.get(data) + case x: HiveDecimalObjectInspector => HiveShim.toCatalystDecimal(x, data) + case x: BinaryObjectInspector if x.preferWritable() => +x.getPrimitiveWritableObject(data).copyBytes() --- End diff -- It looks like this PR may have caused a build break for the hadoop1.0 profile: https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/1245/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=centos/ ``` [warn] Note: Recompile with -Xlint:unchecked for details. [info] Compiling 21 Scala sources and 1 Java source to /home/jenkins/workspace/Spark-Master-SBT/AMPLAB_JENKINS_BUILD_PROFILE/hadoop1.0/label/centos/sql/hive/target/scala-2.10/classes... [error] /home/jenkins/workspace/Spark-Master-SBT/AMPLAB_JENKINS_BUILD_PROFILE/hadoop1.0/label/centos/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala:300: value copyBytes is not a member of org.apache.hadoop.io.BytesWritable [error] x.getPrimitiveWritableObject(data).copyBytes() [error]^ [error] one error found [error] (hive/compile:compile) Compilation failed [error] Total time: 117 s, completed Dec 18, 2014 8:51:43 PM [error] Got a return code of 1 on line 155 of the run-tests script. Build step 'Execute shell' marked build as failure ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4890] Upgrade Boto to 2.34.0; automatic...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/3737#issuecomment-67604433 The deprecation warning idea was good; it turns out that there's a pending deprecation which will change the semantics of one of the methods we use, so I upgraded the code according to the documentation's suggestion. For obtaining code coverage metrics, I applied the following change to the Bash script: ```diff diff --git a/ec2/spark-ec2 b/ec2/spark-ec2 index 3abd3f3..4e3fc69 100755 --- a/ec2/spark-ec2 +++ b/ec2/spark-ec2 @@ -22,4 +22,6 @@ #+ the underlying Python script. SPARK_EC2_DIR="$(dirname $0)" -python -Wdefault "${SPARK_EC2_DIR}/spark_ec2.py" "$@" + +export PYTHONWARNINGS="default" +coverage run -a "${SPARK_EC2_DIR}/spark_ec2.py" "$@" ``` The `-a` option tells `coverage` to accumulate information across multiple runs. I performed an iterative process where I interactively ran `spark-ec2`, used `coverage html` to generate a report, then went back and ran more commands to exercise the code areas that I missed. With the workloads that I ran (launching spot clusters, stopping and starting a cluster, destroying / creating security groups, logging in, canceling spot instance requests), I got to 80% line coverage; most of the lines that I missed were error-handling code. Here's a link to an ASCII coverage report, produced with `coverage annotate spark_ec2.py`: https://gist.github.com/JoshRosen/c09a742805bae3503185 According to the docs: > Usage: coverage annotate [options] [modules] > Make annotated copies of the given files, marking statements that are executed > with > and statements that are missed with !. As you can see, the coverage is pretty good. Therefore, I'd be comfortable merging this PR now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4890] Upgrade Boto to 2.34.0; automatic...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3737#issuecomment-67604418 [Test build #24630 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24630/consoleFull) for PR 3737 at commit [`f02935d`](https://github.com/apache/spark/commit/f02935dd37cdbbf8f9f38396d415cfba74177dba). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4873][Streaming] WriteAheadLogBasedBloc...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/3721#discussion_r22091865 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala --- @@ -125,7 +125,7 @@ private[streaming] class WriteAheadLogManager( * between the node calculating the threshTime (say, driver node), and the local system time * (say, worker node), the caller has to take account of possible time skew. */ - def cleanupOldLogs(threshTime: Long): Unit = { + def cleanupOldLogs(threshTime: Long): Future[Unit] = { --- End diff -- What's more, if we really want to change it to a asynchronously deleting. Returning Future does still work. Just simply writing something like: ```scala deleteFiles() return Promise[Unit]().success(null).future ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4297 [BUILD] Build warning fixes omnibus
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3157#issuecomment-67604251 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24626/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4297 [BUILD] Build warning fixes omnibus
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3157#issuecomment-67604244 [Test build #24626 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24626/consoleFull) for PR 3157 at commit [`cd50b2b`](https://github.com/apache/spark/commit/cd50b2b218ba34574e99bcf681f5ab6dc27fe573). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4873][Streaming] WriteAheadLogBasedBloc...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/3721#discussion_r22091804 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala --- @@ -125,7 +125,7 @@ private[streaming] class WriteAheadLogManager( * between the node calculating the threshTime (say, driver node), and the local system time * (say, worker node), the caller has to take account of possible time skew. */ - def cleanupOldLogs(threshTime: Long): Unit = { + def cleanupOldLogs(threshTime: Long): Future[Unit] = { --- End diff -- > The thing is that deleting asynchronously is an implementation detail - which we choose to do now, which can be changed later, don't you think? If we change it later, we'd have to change this method's signature - which can cause pain if there is code that uses Await.result(*) on this future. I doubt if it will be changed. However, `asynchronously` is an important implementation detail that the caller should know it, or they may misuse it. > If we expose it via a parameter, we can choose to ignore the param and still the calling code will not have to change. Since it is unlikely that the calling code will actually depend on the async nature, it is unlikely to see any difference in functionality and no change in code is required. I don't think a parameter is enough. At least, it needs a more parameter, a `timeout` parameter. In your PR, you used `1 second` which may not be enough. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4871][SQL] Show sql statement in spark ...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/3718#issuecomment-67604099 @marmbrus, is this ok to go? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [WIP][SPARK-4844][MLLIB]SGD should support cus...
Github user witgo commented on the pull request: https://github.com/apache/spark/pull/3729#issuecomment-67603907 The PR is temporarily close. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [WIP][SPARK-4844][MLLIB]SGD should support cus...
Github user witgo closed the pull request at: https://github.com/apache/spark/pull/3729 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3428. TaskMetrics for running tasks is m...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/3684 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3428. TaskMetrics for running tasks is m...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/3684#issuecomment-67603755 This looks good to me, so I'm going to merge this into `master` (1.3.0), `branch-1.2` (1.2.1) and `branch-1.1`, (1.1.2). Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4871][SQL] Show sql statement in spark ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3718#issuecomment-67603749 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24625/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4871][SQL] Show sql statement in spark ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3718#issuecomment-67603746 [Test build #24625 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24625/consoleFull) for PR 3718 at commit [`e0d6b5d`](https://github.com/apache/spark/commit/e0d6b5d7fee2bbd888a262c7e1d32b7dc5860c80). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Test parallelization WIP
Github user JoshRosen closed the pull request at: https://github.com/apache/spark/pull/3681 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4094][CORE] checkpoint should still be ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2956#issuecomment-67603039 [Test build #24629 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24629/consoleFull) for PR 2956 at commit [`be7c1fa`](https://github.com/apache/spark/commit/be7c1fae8deb2922b276fca2c46f747c2cdb05f1). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [branch-1.0][SPARK-4148][PySpark] fix seed dis...
Github user mengxr closed the pull request at: https://github.com/apache/spark/pull/3106 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2075][Core] Make the compiler generate ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3740#issuecomment-67602317 [Test build #24624 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24624/consoleFull) for PR 3740 at commit [`ca03559`](https://github.com/apache/spark/commit/ca03559acf8e01afe4f0fa5d6c15a9283e9ee975). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2075][Core] Make the compiler generate ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3740#issuecomment-67602319 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24624/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4526][MLLIB]Gradient should be added ba...
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/3677#issuecomment-67602032 Had an offline discussion with @witgo . The iterator interface looks good to me, but creating composite methods could be avoided. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [WIP][SPARK-4844][MLLIB]SGD should support cus...
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/3729#issuecomment-67601970 Had an offline discussion with @witgo . The current implementation takes advantage of Parquet's serialization over Java's. However, this is too special for this particular algorithm. I asked @witgo to check the implementation of gap sampling and see how to apply it to min-batch sampling. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4006] Block Manager - Double Register C...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2854#issuecomment-67601642 [Test build #24627 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24627/consoleFull) for PR 2854 at commit [`95ae4db`](https://github.com/apache/spark/commit/95ae4dbe090f16e863614a2f4f2e0b0fb90ec1af). * This patch **fails some tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4006] Block Manager - Double Register C...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2854#issuecomment-67601648 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24627/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3967] don’t redundantly overwrite exe...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/2848#issuecomment-67601524 This looks good to me, so I'm ready to merge it. One final little thing, though: since we have two separate PRs that address SPARK-3967, I'd like to create a new JIRA for this fix just in case this doesn't fix the YARN issue in the original ticket. Therefore, I've opened https://issues.apache.org/jira/browse/SPARK-4896. Do you mind editing this PR's title to reference that JIRA instead? Once you do that, I'll merge this into all of the maintenance branches and resolve the original JIRA. Thanks for your patience during this long review and with the build difficulties. I think that the end result here is a significant improvement over the old `fetchFile` code, so I'm looking forward to merging this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1953][YARN]yarn client mode Application...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3607#issuecomment-67601463 [Test build #24628 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24628/consoleFull) for PR 3607 at commit [`2b27928`](https://github.com/apache/spark/commit/2b2792883b0b12e73fb5a1b957de75a0b53492c7). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1953][YARN]yarn client mode Application...
Github user WangTaoTheTonic commented on a diff in the pull request: https://github.com/apache/spark/pull/3607#discussion_r22090815 --- Diff: docs/running-on-yarn.md --- @@ -22,6 +22,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes Property NameDefaultMeaning + spark.yarn.am.memory + 512m + +Amount of memory to use for the Yarn ApplicationMaster in client mode, in the same format as JVM memory strings (e.g. 512m, 2g). --- End diff -- Thanks fot the catching. Updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4140] Document dynamic allocation
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/3731#discussion_r22090703 --- Diff: docs/job-scheduling.md --- @@ -56,6 +56,112 @@ the same RDDs. For example, the [Shark](http://shark.cs.berkeley.edu) JDBC serve queries. In future releases, in-memory storage systems such as [Tachyon](http://tachyon-project.org) will provide another approach to share RDDs. +## Dynamic Resource Allocation + +Spark 1.2 introduces the ability to dynamically scale the set of cluster resources allocated to +your application up and down based on the workload. This means that your application may give +resources back to the cluster if they are no longer used and request them again later when there +is demand. This feature is particularly useful if multiple applications share resources in your +Spark cluster. If a subset of the resources allocated to an application becomes idle, it can be +returned to the cluster's pool of resources and acquired by other applications. In Spark, dynamic +resource allocation is performed on the granularity of the executor and can be enabled through +`spark.dynamicAllocation.enabled`. + +This feature is currently disabled by default and available only on [YARN](running-on-yarn.html). +A future release will extend this to [standalone mode](spark-standalone.html) and +[Mesos coarse-grained mode](running-on-mesos.html#mesos-run-modes). Note that although Spark on +Mesos already has a similar notion of dynamic resource sharing in fine-grained mode, enabling +dynamic allocation allows your Mesos application to take advantage of coarse-grained low-latency +scheduling while sharing cluster resources efficiently. + +Lastly, it is worth noting that Spark's dynamic resource allocation mechanism is cooperative. +This means if a Spark application enables this feature, other applications on the same cluster +are also expected to do so. Otherwise, the cluster's resources will end up being unfairly +distributed to the applications that do not voluntarily give up unused resources they have +acquired. + +### Configuration and Setup + +All configurations used by this feature live under the `spark.dynamicAllocation.*` namespace. +To enable this feature, your application must set `spark.dynamicAllocation.enabled` to `true` and +provide lower and upper bounds for the number of executors through +`spark.dynamicAllocation.minExecutors` and `spark.dynamicAllocation.maxExecutors`. Other relevant +configurations are described on the [configurations page](configuration.html#dynamic-allocation) +and in the subsequent sections in detail. + +Additionally, your application must use an external shuffle service (described below). To enable +this, set `spark.shuffle.service.enabled` to `true`. In YARN, this external shuffle service is +implemented in `org.apache.spark.yarn.network.YarnShuffleService` that runs in each `NodeManager` --- End diff -- Are you saying this section will be replaced with a pointer to the external shuffle service doc once it's added? If so, looks good to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4674] Refactor getCallSite
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/3532 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4674] Refactor getCallSite
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/3532#issuecomment-67600571 This looks good to me (most of the diff is just an indentation change), so I'm going to merge this to `master` (1.3.0). Thanks! Just out of curiosity, what motivated this change? Did you notice a performance issue here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4006] Block Manager - Double Register C...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2854#issuecomment-67600250 [Test build #24627 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24627/consoleFull) for PR 2854 at commit [`95ae4db`](https://github.com/apache/spark/commit/95ae4dbe090f16e863614a2f4f2e0b0fb90ec1af). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4006] Block Manager - Double Register C...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/2854#issuecomment-67600208 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1953][YARN]yarn client mode Application...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/3607#discussion_r22090369 --- Diff: docs/running-on-yarn.md --- @@ -22,6 +22,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes Property NameDefaultMeaning + spark.yarn.am.memory + 512m + +Amount of memory to use for the Yarn ApplicationMaster in client mode, in the same format as JVM memory strings (e.g. 512m, 2g). --- End diff -- Nit: YARN should be capitalized and application master should be two words. This applies in a few other places as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1953][YARN]yarn client mode Application...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3607#issuecomment-67599686 [Test build #24623 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24623/consoleFull) for PR 3607 at commit [`b7acbb2`](https://github.com/apache/spark/commit/b7acbb266383f6bd6afe6bde936f9de9121b5ab5). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1953][YARN]yarn client mode Application...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3607#issuecomment-67599691 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24623/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4297 [BUILD] Build warning fixes omnibus
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3157#issuecomment-67599082 [Test build #24626 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24626/consoleFull) for PR 3157 at commit [`cd50b2b`](https://github.com/apache/spark/commit/cd50b2b218ba34574e99bcf681f5ab6dc27fe573). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4297 [BUILD] Build warning fixes omnibus
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/3157#issuecomment-67598898 test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4728][MLLib] Add exponential, gamma, an...
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/3680#issuecomment-67598867 LGTM. Merged into master. I've assigned the Python API JIRA to you and set the target version to 1.3.0. So it is not in a hurry. Happy holidays! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4728][MLLib] Add exponential, gamma, an...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/3680 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3000][CORE] drop old blocks to disk in ...
Github user liyezhang556520 commented on the pull request: https://github.com/apache/spark/pull/2134#issuecomment-67598353 Hi @andrewor14 , [PR#3629](https://github.com/apache/spark/pull/3629) solved the problem that I pointed out in your original patch [PR#1165](https://github.com/apache/spark/pull/1165), you can check the comment history on Aug 12th. This PR mainly not focus on this bug, but resolved this bug meanwhile. This PR mainly focus on the disk IO issue, which is memory dropping problem. There is only one thread dropping memory when cached RDD memory need to evict to disk. This problem also pointed out in [PR#791](https://github.com/apache/spark/pull/791). The main difference between this PR and [PR#791](https://github.com/apache/spark/pull/791) is that this PR also make the `tryToPut` process in parallel. And the memory maintain will be more complex. Also this PR make some change with testSuite file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4871][SQL] Show sql statement in spark ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3718#issuecomment-67598149 [Test build #24625 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24625/consoleFull) for PR 3718 at commit [`e0d6b5d`](https://github.com/apache/spark/commit/e0d6b5d7fee2bbd888a262c7e1d32b7dc5860c80). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4861][SQL] Refactory command in spark s...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/3712 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4861][SQL] Refactory command in spark s...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/3712#issuecomment-67597165 Thanks! Merged to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4573] [SQL] Add SettableStructObjectIns...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/3429 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2075][Core] Make the compiler generate ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3740#issuecomment-67597016 [Test build #24624 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24624/consoleFull) for PR 3740 at commit [`ca03559`](https://github.com/apache/spark/commit/ca03559acf8e01afe4f0fa5d6c15a9283e9ee975). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4573] [SQL] Add SettableStructObjectIns...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/3429#issuecomment-67596996 Thanks! Merged to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2554][SQL] Supporting SumDistinct parti...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/3348 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2554][SQL] Supporting SumDistinct parti...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/3348#issuecomment-67596915 Thanks! Merged to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org