[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-56242298 Merged. Thanks a lot! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/2378 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-56241679 @davies LGTM except few linear algebra operators and caching. But those are orthogonal to this PR. I'm merging this and we will update the linear algebra ops later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-56216817 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20576/consoleFull) for PR 2378 at commit [`dffbba2`](https://github.com/apache/spark/commit/dffbba2ba206bbbd3dfc740a55f1b0df341860e7). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-56211052 @mengxr In this PR, I just tried to avoid other changes except serialization, we could change the cache behavior or compression later. It's will be good to have some number of about the performance regression, I only see 5% regression in LogisticRegressionWithSGD.train() with small dataset (locally). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-56210084 @mengxr PickleSerializer do not compress data, there is CompressSerializer can do it using gzip(level 1). Compression can help for small range of double or repeated values, will be worser with random double in large range. BatchedSerializer can help to reduce the overhead of name of class. In JVM, the memory of short lived objects can not be reused without GC, so batched-serialization will not increase the gc pressure if the batch size it not too large. (depend on how gc is configured) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-56207099 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20576/consoleFull) for PR 2378 at commit [`dffbba2`](https://github.com/apache/spark/commit/dffbba2ba206bbbd3dfc740a55f1b0df341860e7). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-56147622 @davies Does `PickleSerializer` compress data? If not, maybe we should cache the deserialized RDD instead of the one from `_.reserialize`. They have the same storage. I understand that batch-serialization can help GC. But algorithms like linear methods should only allocate short-lived objects. Is batch-serialization worth the tradeoff? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-56136476 test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-56122608 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20560/consoleFull) for PR 2378 at commit [`810f97f`](https://github.com/apache/spark/commit/810f97f53befdb262e55e736500d909b5f869f1a). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-56117852 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20560/consoleFull) for PR 2378 at commit [`810f97f`](https://github.com/apache/spark/commit/810f97f53befdb262e55e736500d909b5f869f1a). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-56116566 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/132/consoleFull) for PR 2378 at commit [`032cd62`](https://github.com/apache/spark/commit/032cd62cee6b2bd134f6b9017a7e68ef333990a5). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-56114946 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20554/consoleFull) for PR 2378 at commit [`032cd62`](https://github.com/apache/spark/commit/032cd62cee6b2bd134f6b9017a7e68ef333990a5). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17760498 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala --- @@ -476,259 +436,167 @@ class PythonMLLibAPI extends Serializable { numRows: Long, numCols: Int, numPartitions: java.lang.Integer, - seed: java.lang.Long): JavaRDD[Array[Byte]] = { + seed: java.lang.Long): JavaRDD[Vector] = { val parts = getNumPartitionsOrDefault(numPartitions, jsc) val s = getSeedOrDefault(seed) -RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(SerDe.serializeDoubleVector) +RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s) } } /** - * :: DeveloperApi :: - * MultivariateStatisticalSummary with Vector fields serialized. + * SerDe utility functions for PythonMLLibAPI. */ -@DeveloperApi -class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisticalSummary) - extends Serializable { +private[spark] object SerDe extends Serializable { - def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean) + val PYSPARK_PACKAGE = "pyspark.mllib" - def variance: Array[Byte] = SerDe.serializeDoubleVector(summary.variance) + /** + * Base class used for pickle + */ + private[python] abstract class BasePickler[T: ClassTag] +extends IObjectPickler with IObjectConstructor { + +private val cls = implicitly[ClassTag[T]].runtimeClass +private val module = PYSPARK_PACKAGE + "." + cls.getName.split('.')(4) +private val name = cls.getSimpleName + +// register this to Pickler and Unpickler +def register(): Unit = { + Pickler.registerCustomPickler(this.getClass, this) + Pickler.registerCustomPickler(cls, this) + Unpickler.registerConstructor(module, name, this) +} - def count: Long = summary.count +def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = { + if (obj == this) { +out.write(Opcodes.GLOBAL) +out.write((module + "\n" + name + "\n").getBytes()) + } else { +pickler.save(this) // it will be memorized by Pickler +saveState(obj, out, pickler) +out.write(Opcodes.REDUCE) + } +} + +private[python] def saveObjects(out: OutputStream, pickler: Pickler, +objects: Any*) = { + if (objects.length == 0 || objects.length > 3) { +out.write(Opcodes.MARK) + } + objects.foreach(pickler.save(_)) + val code = objects.length match { +case 1 => Opcodes.TUPLE1 +case 2 => Opcodes.TUPLE2 +case 3 => Opcodes.TUPLE3 +case _ => Opcodes.TUPLE + } + out.write(code) +} - def numNonzeros: Array[Byte] = SerDe.serializeDoubleVector(summary.numNonzeros) +private[python] def saveState(obj: Object, out: OutputStream, pickler: Pickler) + } - def max: Array[Byte] = SerDe.serializeDoubleVector(summary.max) + // Pickler for DenseVector + private[python] class DenseVectorPickler +extends BasePickler[DenseVector] { - def min: Array[Byte] = SerDe.serializeDoubleVector(summary.min) -} +def saveState(obj: Object, out: OutputStream, pickler: Pickler) = { + val vector: DenseVector = obj.asInstanceOf[DenseVector] + saveObjects(out, pickler, vector.toArray) +} -/** - * SerDe utility functions for PythonMLLibAPI. - */ -private[spark] object SerDe extends Serializable { - private val DENSE_VECTOR_MAGIC: Byte = 1 - private val SPARSE_VECTOR_MAGIC: Byte = 2 - private val DENSE_MATRIX_MAGIC: Byte = 3 - private val LABELED_POINT_MAGIC: Byte = 4 - - private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = { -require(bytes.length - offset >= 5, "Byte array too short") -val magic = bytes(offset) -if (magic == DENSE_VECTOR_MAGIC) { - deserializeDenseVector(bytes, offset) -} else if (magic == SPARSE_VECTOR_MAGIC) { - deserializeSparseVector(bytes, offset) -} else { - throw new IllegalArgumentException("Magic " + magic + " is wrong.") +def construct(args: Array[Object]) :Object = { + require(args.length == 1) + new DenseVector(args(0).asInstanceOf[Array[Double]]) } } - private[python] def deserializeDouble(bytes: Array[Byte], offset: Int = 0): Double = { -require(bytes.length - offset == 8, "Wrong size byte array for Double") -val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-56112010 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/132/consoleFull) for PR 2378 at commit [`032cd62`](https://github.com/apache/spark/commit/032cd62cee6b2bd134f6b9017a7e68ef333990a5). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-56110037 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20554/consoleFull) for PR 2378 at commit [`032cd62`](https://github.com/apache/spark/commit/032cd62cee6b2bd134f6b9017a7e68ef333990a5). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-56110091 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20551/consoleFull) for PR 2378 at commit [`bd738ab`](https://github.com/apache/spark/commit/bd738abee534f467c0fb707f414d854af128fec5). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-56109944 @jkbradley I should have addressed all your comments, or leave comments if I have not figure out how to do now, thanks for reviewing this huge PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17757949 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala --- @@ -476,259 +436,167 @@ class PythonMLLibAPI extends Serializable { numRows: Long, numCols: Int, numPartitions: java.lang.Integer, - seed: java.lang.Long): JavaRDD[Array[Byte]] = { + seed: java.lang.Long): JavaRDD[Vector] = { val parts = getNumPartitionsOrDefault(numPartitions, jsc) val s = getSeedOrDefault(seed) -RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(SerDe.serializeDoubleVector) +RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s) } } /** - * :: DeveloperApi :: - * MultivariateStatisticalSummary with Vector fields serialized. + * SerDe utility functions for PythonMLLibAPI. */ -@DeveloperApi -class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisticalSummary) - extends Serializable { +private[spark] object SerDe extends Serializable { - def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean) + val PYSPARK_PACKAGE = "pyspark.mllib" - def variance: Array[Byte] = SerDe.serializeDoubleVector(summary.variance) + /** + * Base class used for pickle + */ + private[python] abstract class BasePickler[T: ClassTag] +extends IObjectPickler with IObjectConstructor { + +private val cls = implicitly[ClassTag[T]].runtimeClass +private val module = PYSPARK_PACKAGE + "." + cls.getName.split('.')(4) +private val name = cls.getSimpleName + +// register this to Pickler and Unpickler +def register(): Unit = { + Pickler.registerCustomPickler(this.getClass, this) + Pickler.registerCustomPickler(cls, this) + Unpickler.registerConstructor(module, name, this) +} - def count: Long = summary.count +def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = { + if (obj == this) { +out.write(Opcodes.GLOBAL) +out.write((module + "\n" + name + "\n").getBytes()) + } else { +pickler.save(this) // it will be memorized by Pickler +saveState(obj, out, pickler) +out.write(Opcodes.REDUCE) + } +} + +private[python] def saveObjects(out: OutputStream, pickler: Pickler, +objects: Any*) = { + if (objects.length == 0 || objects.length > 3) { +out.write(Opcodes.MARK) + } + objects.foreach(pickler.save(_)) + val code = objects.length match { +case 1 => Opcodes.TUPLE1 +case 2 => Opcodes.TUPLE2 +case 3 => Opcodes.TUPLE3 +case _ => Opcodes.TUPLE + } + out.write(code) +} - def numNonzeros: Array[Byte] = SerDe.serializeDoubleVector(summary.numNonzeros) +private[python] def saveState(obj: Object, out: OutputStream, pickler: Pickler) + } - def max: Array[Byte] = SerDe.serializeDoubleVector(summary.max) + // Pickler for DenseVector + private[python] class DenseVectorPickler +extends BasePickler[DenseVector] { - def min: Array[Byte] = SerDe.serializeDoubleVector(summary.min) -} +def saveState(obj: Object, out: OutputStream, pickler: Pickler) = { + val vector: DenseVector = obj.asInstanceOf[DenseVector] + saveObjects(out, pickler, vector.toArray) +} -/** - * SerDe utility functions for PythonMLLibAPI. - */ -private[spark] object SerDe extends Serializable { - private val DENSE_VECTOR_MAGIC: Byte = 1 - private val SPARSE_VECTOR_MAGIC: Byte = 2 - private val DENSE_MATRIX_MAGIC: Byte = 3 - private val LABELED_POINT_MAGIC: Byte = 4 - - private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = { -require(bytes.length - offset >= 5, "Byte array too short") -val magic = bytes(offset) -if (magic == DENSE_VECTOR_MAGIC) { - deserializeDenseVector(bytes, offset) -} else if (magic == SPARSE_VECTOR_MAGIC) { - deserializeSparseVector(bytes, offset) -} else { - throw new IllegalArgumentException("Magic " + magic + " is wrong.") +def construct(args: Array[Object]) :Object = { + require(args.length == 1) + new DenseVector(args(0).asInstanceOf[Array[Double]]) } } - private[python] def deserializeDouble(bytes: Array[Byte], offset: Int = 0): Double = { -require(bytes.length - offset == 8, "Wrong size byte array for Double") -val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17757207 --- Diff: python/pyspark/mllib/tests.py --- @@ -198,41 +212,36 @@ def test_serialize(self): lil[1, 0] = 1 lil[3, 0] = 2 sv = SparseVector(4, {1: 1, 3: 2}) -self.assertEquals(sv, _convert_vector(lil)) -self.assertEquals(sv, _convert_vector(lil.tocsc())) -self.assertEquals(sv, _convert_vector(lil.tocoo())) -self.assertEquals(sv, _convert_vector(lil.tocsr())) -self.assertEquals(sv, _convert_vector(lil.todok())) -self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil))) -self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsc( -self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsr( -self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.todok( +self.assertEquals(sv, _convert_to_vector(lil)) +self.assertEquals(sv, _convert_to_vector(lil.tocsc())) +self.assertEquals(sv, _convert_to_vector(lil.tocoo())) +self.assertEquals(sv, _convert_to_vector(lil.tocsr())) +self.assertEquals(sv, _convert_to_vector(lil.todok())) + +def serialize(l): +return ser.loads(ser.dumps(_convert_to_vector(l))) +self.assertEquals(sv, serialize(lil)) +self.assertEquals(sv, serialize(lil.tocsc())) +self.assertEquals(sv, serialize(lil.tocsr())) +self.assertEquals(sv, serialize(lil.todok())) def test_dot(self): from scipy.sparse import lil_matrix lil = lil_matrix((4, 1)) lil[1, 0] = 1 lil[3, 0] = 2 -dv = array([1., 2., 3., 4.]) -sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4}) -mat = array([[1., 2., 3., 4.], - [1., 2., 3., 4.], - [1., 2., 3., 4.], - [1., 2., 3., 4.]]) -self.assertEquals(10.0, _dot(lil, dv)) -self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(lil, mat))) +dv = DenseVector(array([1., 2., 3., 4.])) +self.assertEquals(10.0, dv.dot(lil)) --- End diff -- Your comment is good, I really worry about that whether I had missed something, it will be bad to have functionality regression. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17756431 --- Diff: python/pyspark/mllib/tests.py --- @@ -198,41 +212,36 @@ def test_serialize(self): lil[1, 0] = 1 lil[3, 0] = 2 sv = SparseVector(4, {1: 1, 3: 2}) -self.assertEquals(sv, _convert_vector(lil)) -self.assertEquals(sv, _convert_vector(lil.tocsc())) -self.assertEquals(sv, _convert_vector(lil.tocoo())) -self.assertEquals(sv, _convert_vector(lil.tocsr())) -self.assertEquals(sv, _convert_vector(lil.todok())) -self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil))) -self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsc( -self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsr( -self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.todok( +self.assertEquals(sv, _convert_to_vector(lil)) +self.assertEquals(sv, _convert_to_vector(lil.tocsc())) +self.assertEquals(sv, _convert_to_vector(lil.tocoo())) +self.assertEquals(sv, _convert_to_vector(lil.tocsr())) +self.assertEquals(sv, _convert_to_vector(lil.todok())) + +def serialize(l): +return ser.loads(ser.dumps(_convert_to_vector(l))) +self.assertEquals(sv, serialize(lil)) +self.assertEquals(sv, serialize(lil.tocsc())) +self.assertEquals(sv, serialize(lil.tocsr())) +self.assertEquals(sv, serialize(lil.todok())) def test_dot(self): from scipy.sparse import lil_matrix lil = lil_matrix((4, 1)) lil[1, 0] = 1 lil[3, 0] = 2 -dv = array([1., 2., 3., 4.]) -sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4}) -mat = array([[1., 2., 3., 4.], - [1., 2., 3., 4.], - [1., 2., 3., 4.], - [1., 2., 3., 4.]]) -self.assertEquals(10.0, _dot(lil, dv)) -self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(lil, mat))) +dv = DenseVector(array([1., 2., 3., 4.])) +self.assertEquals(10.0, dv.dot(lil)) --- End diff -- I guess the assumption in the old code was that one of the arguments of dot() would be an array represented as a 1-dimensional matrix. Now that I think about it, that functionality is better left as traditional matrix-vector multiplication. I revoke my previous comment! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-56104238 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20551/consoleFull) for PR 2378 at commit [`bd738ab`](https://github.com/apache/spark/commit/bd738abee534f467c0fb707f414d854af128fec5). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17752597 --- Diff: python/pyspark/mllib/linalg.py --- @@ -23,14 +23,148 @@ SciPy is available in their environment. """ -import numpy -from numpy import array, array_equal, ndarray, float64, int32 +import sys +import array +import copy_reg +import numpy as np -__all__ = ['SparseVector', 'Vectors'] +__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors'] -class SparseVector(object): +if sys.version_info[:2] == (2, 7): +# speed up pickling array in Python 2.7 +def fast_pickle_array(ar): +return array.array, (ar.typecode, ar.tostring()) +copy_reg.pickle(array.array, fast_pickle_array) + + +# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods, +# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices. + +try: +import scipy.sparse +_have_scipy = True +except: +# No SciPy in environment, but that's okay +_have_scipy = False + + +def _convert_to_vector(l): +if isinstance(l, Vector): +return l +elif type(l) in (array.array, np.array, np.ndarray, list, tuple): +return DenseVector(l) +elif _have_scipy and scipy.sparse.issparse(l): +assert l.shape[1] == 1, "Expected column vector" +csc = l.tocsc() +return SparseVector(l.shape[0], csc.indices, csc.data) +else: +raise TypeError("Cannot convert type %s into Vector" % type(l)) + + +class Vector(object): +""" +Abstract class for DenseVector and SparseVector +""" +def toArray(self): +""" +Convert the vector into an numpy.ndarray +:return: numpy.ndarray +""" +raise NotImplementedError + + +class DenseVector(Vector): +def __init__(self, ar): +if not isinstance(ar, array.array): +ar = array.array('d', ar) +self.array = ar + +def __reduce__(self): +return DenseVector, (self.array,) + +def dot(self, other): +""" +Compute the dot product of two Vectors. We support +(Numpy array, list, SparseVector, or SciPy sparse) +and a target NumPy array that is either 1- or 2-dimensional. +Equivalent to calling numpy.dot of the two vectors. + +>>> dense = DenseVector(array.array('d', [1., 2.])) +>>> dense.dot(dense) +5.0 +>>> dense.dot(SparseVector(2, [0, 1], [2., 1.])) +4.0 +>>> dense.dot(range(1, 3)) +5.0 +>>> dense.dot(np.array(range(1, 3))) +5.0 +""" +if isinstance(other, SparseVector): +return other.dot(self) +elif _have_scipy and scipy.sparse.issparse(other): +return other.transpose().dot(self.toArray())[0] +elif isinstance(other, Vector): +return np.dot(self.toArray(), other.toArray()) +else: +return np.dot(self.toArray(), other) + +def squared_distance(self, other): +""" +Squared distance of two Vectors. + +>>> dense1 = DenseVector(array.array('d', [1., 2.])) +>>> dense1.squared_distance(dense1) +0.0 +>>> dense2 = np.array([2., 1.]) +>>> dense1.squared_distance(dense2) +2.0 +>>> dense3 = [2., 1.] +>>> dense1.squared_distance(dense2) --- End diff -- good catch! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17752588 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala --- @@ -64,6 +64,12 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) override def toArray: Array[Double] = values private[mllib] override def toBreeze: BM[Double] = new BDM[Double](numRows, numCols, values) + + override def equals(o: Any) = o match { --- End diff -- good catch! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17752465 --- Diff: python/pyspark/mllib/linalg.py --- @@ -23,14 +23,148 @@ SciPy is available in their environment. """ -import numpy -from numpy import array, array_equal, ndarray, float64, int32 +import sys +import array +import copy_reg +import numpy as np -__all__ = ['SparseVector', 'Vectors'] +__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors'] -class SparseVector(object): +if sys.version_info[:2] == (2, 7): +# speed up pickling array in Python 2.7 +def fast_pickle_array(ar): +return array.array, (ar.typecode, ar.tostring()) +copy_reg.pickle(array.array, fast_pickle_array) + + +# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods, +# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices. + +try: +import scipy.sparse +_have_scipy = True +except: +# No SciPy in environment, but that's okay +_have_scipy = False + + +def _convert_to_vector(l): +if isinstance(l, Vector): +return l +elif type(l) in (array.array, np.array, np.ndarray, list, tuple): +return DenseVector(l) +elif _have_scipy and scipy.sparse.issparse(l): +assert l.shape[1] == 1, "Expected column vector" +csc = l.tocsc() +return SparseVector(l.shape[0], csc.indices, csc.data) +else: +raise TypeError("Cannot convert type %s into Vector" % type(l)) + + +class Vector(object): +""" +Abstract class for DenseVector and SparseVector +""" +def toArray(self): +""" +Convert the vector into an numpy.ndarray +:return: numpy.ndarray +""" +raise NotImplementedError + + +class DenseVector(Vector): +def __init__(self, ar): +if not isinstance(ar, array.array): +ar = array.array('d', ar) +self.array = ar + +def __reduce__(self): +return DenseVector, (self.array,) + +def dot(self, other): +""" +Compute the dot product of two Vectors. We support +(Numpy array, list, SparseVector, or SciPy sparse) +and a target NumPy array that is either 1- or 2-dimensional. +Equivalent to calling numpy.dot of the two vectors. + +>>> dense = DenseVector(array.array('d', [1., 2.])) +>>> dense.dot(dense) +5.0 +>>> dense.dot(SparseVector(2, [0, 1], [2., 1.])) +4.0 +>>> dense.dot(range(1, 3)) +5.0 +>>> dense.dot(np.array(range(1, 3))) +5.0 +""" +if isinstance(other, SparseVector): +return other.dot(self) +elif _have_scipy and scipy.sparse.issparse(other): +return other.transpose().dot(self.toArray())[0] +elif isinstance(other, Vector): +return np.dot(self.toArray(), other.toArray()) --- End diff -- Yes, there is performance penalty right now. In the case of most of the algorithm are run inside Scala, these type conversion should not be the bottle necks. If we do see this to slow down something, will improve them later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17752055 --- Diff: python/pyspark/mllib/linalg.py --- @@ -257,10 +410,34 @@ def stringify(vector): >>> Vectors.stringify(Vectors.dense([0.0, 1.0])) '[0.0,1.0]' """ -if type(vector) == SparseVector: -return str(vector) -else: -return "[" + ",".join([str(v) for v in vector]) + "]" +return str(vector) + + +class Matrix(Vector): --- End diff -- In the past, we use ndarray for both vector and matrix, the _dot(matrix, matrix) is provided by ndarray. So how to do with _dot(vector, matrix) and _dot(matrix, matrix) ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17751963 --- Diff: python/pyspark/mllib/tests.py --- @@ -198,41 +212,36 @@ def test_serialize(self): lil[1, 0] = 1 lil[3, 0] = 2 sv = SparseVector(4, {1: 1, 3: 2}) -self.assertEquals(sv, _convert_vector(lil)) -self.assertEquals(sv, _convert_vector(lil.tocsc())) -self.assertEquals(sv, _convert_vector(lil.tocoo())) -self.assertEquals(sv, _convert_vector(lil.tocsr())) -self.assertEquals(sv, _convert_vector(lil.todok())) -self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil))) -self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsc( -self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsr( -self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.todok( +self.assertEquals(sv, _convert_to_vector(lil)) +self.assertEquals(sv, _convert_to_vector(lil.tocsc())) +self.assertEquals(sv, _convert_to_vector(lil.tocoo())) +self.assertEquals(sv, _convert_to_vector(lil.tocsr())) +self.assertEquals(sv, _convert_to_vector(lil.todok())) + +def serialize(l): +return ser.loads(ser.dumps(_convert_to_vector(l))) +self.assertEquals(sv, serialize(lil)) +self.assertEquals(sv, serialize(lil.tocsc())) +self.assertEquals(sv, serialize(lil.tocsr())) +self.assertEquals(sv, serialize(lil.todok())) def test_dot(self): from scipy.sparse import lil_matrix lil = lil_matrix((4, 1)) lil[1, 0] = 1 lil[3, 0] = 2 -dv = array([1., 2., 3., 4.]) -sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4}) -mat = array([[1., 2., 3., 4.], - [1., 2., 3., 4.], - [1., 2., 3., 4.], - [1., 2., 3., 4.]]) -self.assertEquals(10.0, _dot(lil, dv)) -self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(lil, mat))) +dv = DenseVector(array([1., 2., 3., 4.])) +self.assertEquals(10.0, dv.dot(lil)) --- End diff -- lil and mat are both matrix, the dot of them are needed in which module/algorithm? I did not figure out how to deal with them now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-55987147 @davies This looks like a great PR! I donât see major issues, though +1 to the remarks about checking for performance regressions. Pending performance testing and my small comments, this looks good to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17703595 --- Diff: python/pyspark/mllib/tree.py --- @@ -90,53 +89,24 @@ class DecisionTree(object): EXPERIMENTAL: This is an experimental API. It will probably be modified for Spark v1.2. -Example usage: - ->>> from numpy import array ->>> import sys ->>> from pyspark.mllib.regression import LabeledPoint ->>> from pyspark.mllib.tree import DecisionTree ->>> from pyspark.mllib.linalg import SparseVector ->>> ->>> data = [ -... LabeledPoint(0.0, [0.0]), -... LabeledPoint(1.0, [1.0]), -... LabeledPoint(1.0, [2.0]), -... LabeledPoint(1.0, [3.0]) -... ] ->>> categoricalFeaturesInfo = {} # no categorical features ->>> model = DecisionTree.trainClassifier(sc.parallelize(data), numClasses=2, -... categoricalFeaturesInfo=categoricalFeaturesInfo) ->>> sys.stdout.write(model) -DecisionTreeModel classifier - If (feature 0 <= 0.5) - Predict: 0.0 - Else (feature 0 > 0.5) - Predict: 1.0 ->>> model.predict(array([1.0])) > 0 -True ->>> model.predict(array([0.0])) == 0 -True ->>> sparse_data = [ -... LabeledPoint(0.0, SparseVector(2, {0: 0.0})), -... LabeledPoint(1.0, SparseVector(2, {1: 1.0})), -... LabeledPoint(0.0, SparseVector(2, {0: 0.0})), -... LabeledPoint(1.0, SparseVector(2, {1: 2.0})) -... ] ->>> ->>> model = DecisionTree.trainRegressor(sc.parallelize(sparse_data), -... categoricalFeaturesInfo=categoricalFeaturesInfo) ->>> model.predict(array([0.0, 1.0])) == 1 -True ->>> model.predict(array([0.0, 0.0])) == 0 -True ->>> model.predict(SparseVector(2, {1: 1.0})) == 1 -True ->>> model.predict(SparseVector(2, {1: 0.0})) == 0 -True """ @staticmethod +def _train(data, type, numClasses, categoricalFeaturesInfo, + impurity="gini", maxDepth=5, maxBins=32, minInstancesPerNode=1, + minInfoGain=0.0): +first = data.first() +assert isinstance(first, LabeledPoint), "the data should be RDD of LabeleddPoint" --- End diff -- "LabeleddPoint" --> "LabeledPoint" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17703466 --- Diff: python/pyspark/mllib/tests.py --- @@ -198,41 +212,36 @@ def test_serialize(self): lil[1, 0] = 1 lil[3, 0] = 2 sv = SparseVector(4, {1: 1, 3: 2}) -self.assertEquals(sv, _convert_vector(lil)) -self.assertEquals(sv, _convert_vector(lil.tocsc())) -self.assertEquals(sv, _convert_vector(lil.tocoo())) -self.assertEquals(sv, _convert_vector(lil.tocsr())) -self.assertEquals(sv, _convert_vector(lil.todok())) -self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil))) -self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsc( -self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsr( -self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.todok( +self.assertEquals(sv, _convert_to_vector(lil)) +self.assertEquals(sv, _convert_to_vector(lil.tocsc())) +self.assertEquals(sv, _convert_to_vector(lil.tocoo())) +self.assertEquals(sv, _convert_to_vector(lil.tocsr())) +self.assertEquals(sv, _convert_to_vector(lil.todok())) + +def serialize(l): +return ser.loads(ser.dumps(_convert_to_vector(l))) +self.assertEquals(sv, serialize(lil)) +self.assertEquals(sv, serialize(lil.tocsc())) +self.assertEquals(sv, serialize(lil.tocsr())) +self.assertEquals(sv, serialize(lil.todok())) def test_dot(self): from scipy.sparse import lil_matrix lil = lil_matrix((4, 1)) lil[1, 0] = 1 lil[3, 0] = 2 -dv = array([1., 2., 3., 4.]) -sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4}) -mat = array([[1., 2., 3., 4.], - [1., 2., 3., 4.], - [1., 2., 3., 4.], - [1., 2., 3., 4.]]) -self.assertEquals(10.0, _dot(lil, dv)) -self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(lil, mat))) +dv = DenseVector(array([1., 2., 3., 4.])) +self.assertEquals(10.0, dv.dot(lil)) --- End diff -- Why remove test of dot(vector, matrix)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17702101 --- Diff: python/pyspark/mllib/linalg.py --- @@ -257,10 +410,34 @@ def stringify(vector): >>> Vectors.stringify(Vectors.dense([0.0, 1.0])) '[0.0,1.0]' """ -if type(vector) == SparseVector: -return str(vector) -else: -return "[" + ",".join([str(v) for v in vector]) + "]" +return str(vector) + + +class Matrix(Vector): --- End diff -- I'm not sure this should subclass Vector. A Matrix is not really a type of Vector. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17702050 --- Diff: python/pyspark/mllib/linalg.py --- @@ -257,10 +410,34 @@ def stringify(vector): >>> Vectors.stringify(Vectors.dense([0.0, 1.0])) '[0.0,1.0]' """ -if type(vector) == SparseVector: -return str(vector) -else: -return "[" + ",".join([str(v) for v in vector]) + "]" +return str(vector) + + +class Matrix(Vector): +""" the Matrix """ + + +class DenseMatrix(Matrix): +def __init__(self, nRow, nCol, values): +assert len(values) == nRow * nCol +self.nRow = nRow --- End diff -- Should nRow and nCol not belong to the Matrix class? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17701626 --- Diff: python/pyspark/mllib/linalg.py --- @@ -61,16 +195,19 @@ def __init__(self, size, *args): if type(pairs) == dict: pairs = pairs.items() pairs = sorted(pairs) -self.indices = array([p[0] for p in pairs], dtype=int32) -self.values = array([p[1] for p in pairs], dtype=float64) +self.indices = array.array('i', [p[0] for p in pairs]) +self.values = array.array('d', [p[1] for p in pairs]) else: assert len(args[0]) == len(args[1]), "index and value arrays not same length" -self.indices = array(args[0], dtype=int32) -self.values = array(args[1], dtype=float64) +self.indices = array.array('i', args[0]) +self.values = array.array('d', args[1]) for i in xrange(len(self.indices) - 1): if self.indices[i] >= self.indices[i + 1]: raise TypeError("indices array must be sorted") +def __reduce__(self): +return (SparseVector, (self.size, self.indices, self.values)) + def dot(self, other): """ Dot product with a SparseVector or 1- or 2-dimensional Numpy array. --- End diff -- Update doc to say dot product with each column if given a 2-d array? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17701227 --- Diff: python/pyspark/mllib/linalg.py --- @@ -61,16 +195,19 @@ def __init__(self, size, *args): if type(pairs) == dict: pairs = pairs.items() pairs = sorted(pairs) -self.indices = array([p[0] for p in pairs], dtype=int32) -self.values = array([p[1] for p in pairs], dtype=float64) +self.indices = array.array('i', [p[0] for p in pairs]) --- End diff -- Also maybe check that p is a pair. I could imagine someone writing a mistake like this: SparseVector(4, [(1, 2, 3), (1, 5, 0)]) as in: SparseVector(4, [(indices), (values)]) I think the code would not catch this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17701086 --- Diff: python/pyspark/mllib/linalg.py --- @@ -61,16 +195,19 @@ def __init__(self, size, *args): if type(pairs) == dict: pairs = pairs.items() pairs = sorted(pairs) -self.indices = array([p[0] for p in pairs], dtype=int32) -self.values = array([p[1] for p in pairs], dtype=float64) +self.indices = array.array('i', [p[0] for p in pairs]) --- End diff -- Check for duplicates? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17700987 --- Diff: python/pyspark/mllib/linalg.py --- @@ -23,14 +23,148 @@ SciPy is available in their environment. """ -import numpy -from numpy import array, array_equal, ndarray, float64, int32 +import sys +import array +import copy_reg +import numpy as np -__all__ = ['SparseVector', 'Vectors'] +__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors'] -class SparseVector(object): +if sys.version_info[:2] == (2, 7): +# speed up pickling array in Python 2.7 +def fast_pickle_array(ar): +return array.array, (ar.typecode, ar.tostring()) +copy_reg.pickle(array.array, fast_pickle_array) + + +# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods, +# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices. + +try: +import scipy.sparse +_have_scipy = True +except: +# No SciPy in environment, but that's okay +_have_scipy = False + + +def _convert_to_vector(l): +if isinstance(l, Vector): +return l +elif type(l) in (array.array, np.array, np.ndarray, list, tuple): +return DenseVector(l) +elif _have_scipy and scipy.sparse.issparse(l): +assert l.shape[1] == 1, "Expected column vector" +csc = l.tocsc() +return SparseVector(l.shape[0], csc.indices, csc.data) +else: +raise TypeError("Cannot convert type %s into Vector" % type(l)) + + +class Vector(object): +""" +Abstract class for DenseVector and SparseVector +""" +def toArray(self): +""" +Convert the vector into an numpy.ndarray +:return: numpy.ndarray +""" +raise NotImplementedError + + +class DenseVector(Vector): +def __init__(self, ar): +if not isinstance(ar, array.array): +ar = array.array('d', ar) +self.array = ar + +def __reduce__(self): +return DenseVector, (self.array,) + +def dot(self, other): +""" +Compute the dot product of two Vectors. We support +(Numpy array, list, SparseVector, or SciPy sparse) +and a target NumPy array that is either 1- or 2-dimensional. +Equivalent to calling numpy.dot of the two vectors. + +>>> dense = DenseVector(array.array('d', [1., 2.])) +>>> dense.dot(dense) +5.0 +>>> dense.dot(SparseVector(2, [0, 1], [2., 1.])) +4.0 +>>> dense.dot(range(1, 3)) +5.0 +>>> dense.dot(np.array(range(1, 3))) +5.0 +""" +if isinstance(other, SparseVector): +return other.dot(self) +elif _have_scipy and scipy.sparse.issparse(other): +return other.transpose().dot(self.toArray())[0] +elif isinstance(other, Vector): +return np.dot(self.toArray(), other.toArray()) +else: +return np.dot(self.toArray(), other) + +def squared_distance(self, other): +""" +Squared distance of two Vectors. + +>>> dense1 = DenseVector(array.array('d', [1., 2.])) +>>> dense1.squared_distance(dense1) +0.0 +>>> dense2 = np.array([2., 1.]) +>>> dense1.squared_distance(dense2) +2.0 +>>> dense3 = [2., 1.] +>>> dense1.squared_distance(dense2) +2.0 +>>> sparse1 = SparseVector(2, [0, 1], [2., 1.]) +>>> dense1.squared_distance(sparse1) +2.0 +""" +if isinstance(other, SparseVector): +return other.squared_distance(self) +elif _have_scipy and scipy.sparse.issparse(other): +return _convert_to_vector(other).squared_distance(self) + +if isinstance(other, Vector): +other = other.toArray() +elif not isinstance(other, np.ndarray): +other = np.array(other) +diff = self.toArray() - other +return np.dot(diff, diff) + +def toArray(self): +return np.array(self.array) + +def __getitem__(self, item): +return self.array[item] + +def __len__(self): +return len(self.array) + +def __str__(self): +return "[" + ",".join([str(v) for v in self.array]) + "]" + +def __repr__(self): +return "DenseVector(%r)" % self.array + +def __eq__(self, other): +return isinstance(other, De
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17700471 --- Diff: python/pyspark/mllib/linalg.py --- @@ -23,14 +23,148 @@ SciPy is available in their environment. """ -import numpy -from numpy import array, array_equal, ndarray, float64, int32 +import sys +import array +import copy_reg +import numpy as np -__all__ = ['SparseVector', 'Vectors'] +__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors'] -class SparseVector(object): +if sys.version_info[:2] == (2, 7): +# speed up pickling array in Python 2.7 +def fast_pickle_array(ar): +return array.array, (ar.typecode, ar.tostring()) +copy_reg.pickle(array.array, fast_pickle_array) + + +# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods, +# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices. + +try: +import scipy.sparse +_have_scipy = True +except: +# No SciPy in environment, but that's okay +_have_scipy = False + + +def _convert_to_vector(l): +if isinstance(l, Vector): +return l +elif type(l) in (array.array, np.array, np.ndarray, list, tuple): +return DenseVector(l) +elif _have_scipy and scipy.sparse.issparse(l): +assert l.shape[1] == 1, "Expected column vector" +csc = l.tocsc() +return SparseVector(l.shape[0], csc.indices, csc.data) +else: +raise TypeError("Cannot convert type %s into Vector" % type(l)) + + +class Vector(object): +""" +Abstract class for DenseVector and SparseVector +""" +def toArray(self): +""" +Convert the vector into an numpy.ndarray +:return: numpy.ndarray +""" +raise NotImplementedError + + +class DenseVector(Vector): +def __init__(self, ar): +if not isinstance(ar, array.array): +ar = array.array('d', ar) +self.array = ar + +def __reduce__(self): +return DenseVector, (self.array,) + +def dot(self, other): +""" +Compute the dot product of two Vectors. We support +(Numpy array, list, SparseVector, or SciPy sparse) +and a target NumPy array that is either 1- or 2-dimensional. +Equivalent to calling numpy.dot of the two vectors. + +>>> dense = DenseVector(array.array('d', [1., 2.])) +>>> dense.dot(dense) +5.0 +>>> dense.dot(SparseVector(2, [0, 1], [2., 1.])) +4.0 +>>> dense.dot(range(1, 3)) +5.0 +>>> dense.dot(np.array(range(1, 3))) +5.0 +""" +if isinstance(other, SparseVector): +return other.dot(self) +elif _have_scipy and scipy.sparse.issparse(other): +return other.transpose().dot(self.toArray())[0] +elif isinstance(other, Vector): +return np.dot(self.toArray(), other.toArray()) --- End diff -- (This question applies to other functions too.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17700424 --- Diff: python/pyspark/mllib/linalg.py --- @@ -23,14 +23,148 @@ SciPy is available in their environment. """ -import numpy -from numpy import array, array_equal, ndarray, float64, int32 +import sys +import array +import copy_reg +import numpy as np -__all__ = ['SparseVector', 'Vectors'] +__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors'] -class SparseVector(object): +if sys.version_info[:2] == (2, 7): +# speed up pickling array in Python 2.7 +def fast_pickle_array(ar): +return array.array, (ar.typecode, ar.tostring()) +copy_reg.pickle(array.array, fast_pickle_array) + + +# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods, +# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices. + +try: +import scipy.sparse +_have_scipy = True +except: +# No SciPy in environment, but that's okay +_have_scipy = False + + +def _convert_to_vector(l): +if isinstance(l, Vector): +return l +elif type(l) in (array.array, np.array, np.ndarray, list, tuple): +return DenseVector(l) +elif _have_scipy and scipy.sparse.issparse(l): +assert l.shape[1] == 1, "Expected column vector" +csc = l.tocsc() +return SparseVector(l.shape[0], csc.indices, csc.data) +else: +raise TypeError("Cannot convert type %s into Vector" % type(l)) + + +class Vector(object): +""" +Abstract class for DenseVector and SparseVector +""" +def toArray(self): +""" +Convert the vector into an numpy.ndarray +:return: numpy.ndarray +""" +raise NotImplementedError + + +class DenseVector(Vector): +def __init__(self, ar): +if not isinstance(ar, array.array): +ar = array.array('d', ar) +self.array = ar + +def __reduce__(self): +return DenseVector, (self.array,) + +def dot(self, other): +""" +Compute the dot product of two Vectors. We support +(Numpy array, list, SparseVector, or SciPy sparse) +and a target NumPy array that is either 1- or 2-dimensional. +Equivalent to calling numpy.dot of the two vectors. + +>>> dense = DenseVector(array.array('d', [1., 2.])) +>>> dense.dot(dense) +5.0 +>>> dense.dot(SparseVector(2, [0, 1], [2., 1.])) +4.0 +>>> dense.dot(range(1, 3)) +5.0 +>>> dense.dot(np.array(range(1, 3))) +5.0 +""" +if isinstance(other, SparseVector): +return other.dot(self) +elif _have_scipy and scipy.sparse.issparse(other): +return other.transpose().dot(self.toArray())[0] +elif isinstance(other, Vector): +return np.dot(self.toArray(), other.toArray()) +else: +return np.dot(self.toArray(), other) + +def squared_distance(self, other): +""" +Squared distance of two Vectors. + +>>> dense1 = DenseVector(array.array('d', [1., 2.])) +>>> dense1.squared_distance(dense1) +0.0 +>>> dense2 = np.array([2., 1.]) +>>> dense1.squared_distance(dense2) +2.0 +>>> dense3 = [2., 1.] +>>> dense1.squared_distance(dense2) --- End diff -- "dense2" --> "dense3" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17700232 --- Diff: python/pyspark/mllib/linalg.py --- @@ -23,14 +23,148 @@ SciPy is available in their environment. """ -import numpy -from numpy import array, array_equal, ndarray, float64, int32 +import sys +import array +import copy_reg +import numpy as np -__all__ = ['SparseVector', 'Vectors'] +__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors'] -class SparseVector(object): +if sys.version_info[:2] == (2, 7): +# speed up pickling array in Python 2.7 +def fast_pickle_array(ar): +return array.array, (ar.typecode, ar.tostring()) +copy_reg.pickle(array.array, fast_pickle_array) + + +# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods, +# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices. + +try: +import scipy.sparse +_have_scipy = True +except: +# No SciPy in environment, but that's okay +_have_scipy = False + + +def _convert_to_vector(l): +if isinstance(l, Vector): +return l +elif type(l) in (array.array, np.array, np.ndarray, list, tuple): +return DenseVector(l) +elif _have_scipy and scipy.sparse.issparse(l): +assert l.shape[1] == 1, "Expected column vector" +csc = l.tocsc() +return SparseVector(l.shape[0], csc.indices, csc.data) +else: +raise TypeError("Cannot convert type %s into Vector" % type(l)) + + +class Vector(object): +""" +Abstract class for DenseVector and SparseVector +""" +def toArray(self): +""" +Convert the vector into an numpy.ndarray +:return: numpy.ndarray +""" +raise NotImplementedError + + +class DenseVector(Vector): +def __init__(self, ar): +if not isinstance(ar, array.array): +ar = array.array('d', ar) +self.array = ar + +def __reduce__(self): +return DenseVector, (self.array,) + +def dot(self, other): +""" +Compute the dot product of two Vectors. We support +(Numpy array, list, SparseVector, or SciPy sparse) +and a target NumPy array that is either 1- or 2-dimensional. +Equivalent to calling numpy.dot of the two vectors. + +>>> dense = DenseVector(array.array('d', [1., 2.])) +>>> dense.dot(dense) +5.0 +>>> dense.dot(SparseVector(2, [0, 1], [2., 1.])) +4.0 +>>> dense.dot(range(1, 3)) +5.0 +>>> dense.dot(np.array(range(1, 3))) +5.0 +""" +if isinstance(other, SparseVector): +return other.dot(self) +elif _have_scipy and scipy.sparse.issparse(other): +return other.transpose().dot(self.toArray())[0] +elif isinstance(other, Vector): +return np.dot(self.toArray(), other.toArray()) --- End diff -- Just wondering: Is it inefficient to have to convert types by calling toArray()? Does that mean multiple passes over (or copies of) the data? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17698673 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala --- @@ -64,6 +64,12 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) override def toArray: Array[Double] = values private[mllib] override def toBreeze: BM[Double] = new BDM[Double](numRows, numCols, values) + + override def equals(o: Any) = o match { --- End diff -- Should this not check values? Even though that would be expensive, it should be necessary to match the expected behavior of equals(). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17698519 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala --- @@ -476,259 +436,167 @@ class PythonMLLibAPI extends Serializable { numRows: Long, numCols: Int, numPartitions: java.lang.Integer, - seed: java.lang.Long): JavaRDD[Array[Byte]] = { + seed: java.lang.Long): JavaRDD[Vector] = { val parts = getNumPartitionsOrDefault(numPartitions, jsc) val s = getSeedOrDefault(seed) -RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(SerDe.serializeDoubleVector) +RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s) } } /** - * :: DeveloperApi :: - * MultivariateStatisticalSummary with Vector fields serialized. + * SerDe utility functions for PythonMLLibAPI. */ -@DeveloperApi -class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisticalSummary) - extends Serializable { +private[spark] object SerDe extends Serializable { - def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean) + val PYSPARK_PACKAGE = "pyspark.mllib" - def variance: Array[Byte] = SerDe.serializeDoubleVector(summary.variance) + /** + * Base class used for pickle + */ + private[python] abstract class BasePickler[T: ClassTag] +extends IObjectPickler with IObjectConstructor { + +private val cls = implicitly[ClassTag[T]].runtimeClass +private val module = PYSPARK_PACKAGE + "." + cls.getName.split('.')(4) +private val name = cls.getSimpleName + +// register this to Pickler and Unpickler +def register(): Unit = { + Pickler.registerCustomPickler(this.getClass, this) + Pickler.registerCustomPickler(cls, this) + Unpickler.registerConstructor(module, name, this) +} - def count: Long = summary.count +def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = { + if (obj == this) { +out.write(Opcodes.GLOBAL) +out.write((module + "\n" + name + "\n").getBytes()) + } else { +pickler.save(this) // it will be memorized by Pickler +saveState(obj, out, pickler) +out.write(Opcodes.REDUCE) + } +} + +private[python] def saveObjects(out: OutputStream, pickler: Pickler, +objects: Any*) = { + if (objects.length == 0 || objects.length > 3) { +out.write(Opcodes.MARK) + } + objects.foreach(pickler.save(_)) + val code = objects.length match { +case 1 => Opcodes.TUPLE1 +case 2 => Opcodes.TUPLE2 +case 3 => Opcodes.TUPLE3 +case _ => Opcodes.TUPLE + } + out.write(code) +} - def numNonzeros: Array[Byte] = SerDe.serializeDoubleVector(summary.numNonzeros) +private[python] def saveState(obj: Object, out: OutputStream, pickler: Pickler) + } - def max: Array[Byte] = SerDe.serializeDoubleVector(summary.max) + // Pickler for DenseVector + private[python] class DenseVectorPickler +extends BasePickler[DenseVector] { - def min: Array[Byte] = SerDe.serializeDoubleVector(summary.min) -} +def saveState(obj: Object, out: OutputStream, pickler: Pickler) = { + val vector: DenseVector = obj.asInstanceOf[DenseVector] + saveObjects(out, pickler, vector.toArray) +} -/** - * SerDe utility functions for PythonMLLibAPI. - */ -private[spark] object SerDe extends Serializable { - private val DENSE_VECTOR_MAGIC: Byte = 1 - private val SPARSE_VECTOR_MAGIC: Byte = 2 - private val DENSE_MATRIX_MAGIC: Byte = 3 - private val LABELED_POINT_MAGIC: Byte = 4 - - private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = { -require(bytes.length - offset >= 5, "Byte array too short") -val magic = bytes(offset) -if (magic == DENSE_VECTOR_MAGIC) { - deserializeDenseVector(bytes, offset) -} else if (magic == SPARSE_VECTOR_MAGIC) { - deserializeSparseVector(bytes, offset) -} else { - throw new IllegalArgumentException("Magic " + magic + " is wrong.") +def construct(args: Array[Object]) :Object = { + require(args.length == 1) + new DenseVector(args(0).asInstanceOf[Array[Double]]) } } - private[python] def deserializeDouble(bytes: Array[Byte], offset: Int = 0): Double = { -require(bytes.length - offset == 8, "Wrong size byte array for Double") -val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17697397 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala --- @@ -476,259 +436,167 @@ class PythonMLLibAPI extends Serializable { numRows: Long, numCols: Int, numPartitions: java.lang.Integer, - seed: java.lang.Long): JavaRDD[Array[Byte]] = { + seed: java.lang.Long): JavaRDD[Vector] = { val parts = getNumPartitionsOrDefault(numPartitions, jsc) val s = getSeedOrDefault(seed) -RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(SerDe.serializeDoubleVector) +RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s) } } /** - * :: DeveloperApi :: - * MultivariateStatisticalSummary with Vector fields serialized. + * SerDe utility functions for PythonMLLibAPI. */ -@DeveloperApi -class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisticalSummary) - extends Serializable { +private[spark] object SerDe extends Serializable { - def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean) + val PYSPARK_PACKAGE = "pyspark.mllib" - def variance: Array[Byte] = SerDe.serializeDoubleVector(summary.variance) + /** + * Base class used for pickle + */ + private[python] abstract class BasePickler[T: ClassTag] +extends IObjectPickler with IObjectConstructor { + +private val cls = implicitly[ClassTag[T]].runtimeClass +private val module = PYSPARK_PACKAGE + "." + cls.getName.split('.')(4) +private val name = cls.getSimpleName + +// register this to Pickler and Unpickler +def register(): Unit = { + Pickler.registerCustomPickler(this.getClass, this) + Pickler.registerCustomPickler(cls, this) + Unpickler.registerConstructor(module, name, this) +} - def count: Long = summary.count +def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = { + if (obj == this) { +out.write(Opcodes.GLOBAL) +out.write((module + "\n" + name + "\n").getBytes()) + } else { +pickler.save(this) // it will be memorized by Pickler +saveState(obj, out, pickler) +out.write(Opcodes.REDUCE) + } +} + +private[python] def saveObjects(out: OutputStream, pickler: Pickler, +objects: Any*) = { + if (objects.length == 0 || objects.length > 3) { +out.write(Opcodes.MARK) + } + objects.foreach(pickler.save(_)) + val code = objects.length match { +case 1 => Opcodes.TUPLE1 +case 2 => Opcodes.TUPLE2 +case 3 => Opcodes.TUPLE3 +case _ => Opcodes.TUPLE + } + out.write(code) +} - def numNonzeros: Array[Byte] = SerDe.serializeDoubleVector(summary.numNonzeros) +private[python] def saveState(obj: Object, out: OutputStream, pickler: Pickler) + } - def max: Array[Byte] = SerDe.serializeDoubleVector(summary.max) + // Pickler for DenseVector + private[python] class DenseVectorPickler +extends BasePickler[DenseVector] { - def min: Array[Byte] = SerDe.serializeDoubleVector(summary.min) -} +def saveState(obj: Object, out: OutputStream, pickler: Pickler) = { + val vector: DenseVector = obj.asInstanceOf[DenseVector] + saveObjects(out, pickler, vector.toArray) +} -/** - * SerDe utility functions for PythonMLLibAPI. - */ -private[spark] object SerDe extends Serializable { - private val DENSE_VECTOR_MAGIC: Byte = 1 - private val SPARSE_VECTOR_MAGIC: Byte = 2 - private val DENSE_MATRIX_MAGIC: Byte = 3 - private val LABELED_POINT_MAGIC: Byte = 4 - - private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = { -require(bytes.length - offset >= 5, "Byte array too short") -val magic = bytes(offset) -if (magic == DENSE_VECTOR_MAGIC) { - deserializeDenseVector(bytes, offset) -} else if (magic == SPARSE_VECTOR_MAGIC) { - deserializeSparseVector(bytes, offset) -} else { - throw new IllegalArgumentException("Magic " + magic + " is wrong.") +def construct(args: Array[Object]) :Object = { + require(args.length == 1) + new DenseVector(args(0).asInstanceOf[Array[Double]]) } } - private[python] def deserializeDouble(bytes: Array[Byte], offset: Int = 0): Double = { -require(bytes.length - offset == 8, "Wrong size byte array for Double") -val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17697102 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala --- @@ -476,259 +436,167 @@ class PythonMLLibAPI extends Serializable { numRows: Long, numCols: Int, numPartitions: java.lang.Integer, - seed: java.lang.Long): JavaRDD[Array[Byte]] = { + seed: java.lang.Long): JavaRDD[Vector] = { val parts = getNumPartitionsOrDefault(numPartitions, jsc) val s = getSeedOrDefault(seed) -RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(SerDe.serializeDoubleVector) +RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s) } } /** - * :: DeveloperApi :: - * MultivariateStatisticalSummary with Vector fields serialized. + * SerDe utility functions for PythonMLLibAPI. */ -@DeveloperApi -class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisticalSummary) - extends Serializable { +private[spark] object SerDe extends Serializable { - def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean) + val PYSPARK_PACKAGE = "pyspark.mllib" - def variance: Array[Byte] = SerDe.serializeDoubleVector(summary.variance) + /** + * Base class used for pickle + */ + private[python] abstract class BasePickler[T: ClassTag] +extends IObjectPickler with IObjectConstructor { + +private val cls = implicitly[ClassTag[T]].runtimeClass +private val module = PYSPARK_PACKAGE + "." + cls.getName.split('.')(4) +private val name = cls.getSimpleName + +// register this to Pickler and Unpickler +def register(): Unit = { + Pickler.registerCustomPickler(this.getClass, this) + Pickler.registerCustomPickler(cls, this) + Unpickler.registerConstructor(module, name, this) +} - def count: Long = summary.count +def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = { + if (obj == this) { +out.write(Opcodes.GLOBAL) +out.write((module + "\n" + name + "\n").getBytes()) + } else { +pickler.save(this) // it will be memorized by Pickler +saveState(obj, out, pickler) +out.write(Opcodes.REDUCE) + } +} + +private[python] def saveObjects(out: OutputStream, pickler: Pickler, +objects: Any*) = { --- End diff -- Can fit def on 1 line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17694320 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala --- @@ -40,11 +43,11 @@ import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils +import scala.reflect.ClassTag --- End diff -- Imports out of order. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17693196 --- Diff: python/pyspark/mllib/recommendation.py --- @@ -54,34 +64,51 @@ def __del__(self): def predict(self, user, product): return self._java_model.predict(user, product) -def predictAll(self, usersProducts): -usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple) -return RDD(self._java_model.predict(usersProductsJRDD._jrdd), - self._context, RatingDeserializer()) +def predictAll(self, user_product): +assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)" +sc = self._context +tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd()) +jresult = self._java_model.predict(tuplerdd).toJavaRDD() +return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc, + AutoBatchedSerializer(PickleSerializer())) class ALS(object): @classmethod +def _prepare(cls, ratings): +assert isinstance(ratings, RDD), "ratings should be RDD" +first = ratings.first() +if not isinstance(first, Rating): +if isinstance(first, (tuple, list)): +ratings = ratings.map(lambda x: Rating(*x)) +else: +raise ValueError("rating should be RDD of Rating or tuple/list") +# serialize them by AutoBatchedSerializer before cache to reduce the +# objects overhead in JVM +cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache() --- End diff -- Concerning DecisionTree, this is something to be tested and possibly fixed. I suspect it will be better to cache, but need to test. Some tests do show that the final read and persist take much longer than the initial reads, so caching will not help. But other tests indicate it may be worthwhile. I think it is data-dependent, as suggested above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user staple commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17687682 --- Diff: python/pyspark/mllib/recommendation.py --- @@ -54,34 +64,51 @@ def __del__(self): def predict(self, user, product): return self._java_model.predict(user, product) -def predictAll(self, usersProducts): -usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple) -return RDD(self._java_model.predict(usersProductsJRDD._jrdd), - self._context, RatingDeserializer()) +def predictAll(self, user_product): +assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)" +sc = self._context +tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd()) +jresult = self._java_model.predict(tuplerdd).toJavaRDD() +return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc, + AutoBatchedSerializer(PickleSerializer())) class ALS(object): @classmethod +def _prepare(cls, ratings): +assert isinstance(ratings, RDD), "ratings should be RDD" +first = ratings.first() +if not isinstance(first, Rating): +if isinstance(first, (tuple, list)): +ratings = ratings.map(lambda x: Rating(*x)) +else: +raise ValueError("rating should be RDD of Rating or tuple/list") +# serialize them by AutoBatchedSerializer before cache to reduce the +# objects overhead in JVM +cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache() --- End diff -- FWIW it looks like the DecisionTree learner can do multiple reads (of varying types) of its input. In DecisionTree.train, DecisionTreeMetadata.buildMetadata does a count() on the input, DecisionTree.findSplitsBins can do a sampled read, and then TreePoint.convertToTreeRDD will do a mapped read that gets persisted. I'm not knowledgable enough to know how expensive that initial count() will be without further investigation. But I think for DecisionTree the suggestion was not to cache before learning. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17686887 --- Diff: python/pyspark/mllib/recommendation.py --- @@ -54,34 +64,51 @@ def __del__(self): def predict(self, user, product): return self._java_model.predict(user, product) -def predictAll(self, usersProducts): -usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple) -return RDD(self._java_model.predict(usersProductsJRDD._jrdd), - self._context, RatingDeserializer()) +def predictAll(self, user_product): +assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)" +sc = self._context +tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd()) +jresult = self._java_model.predict(tuplerdd).toJavaRDD() +return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc, + AutoBatchedSerializer(PickleSerializer())) class ALS(object): @classmethod +def _prepare(cls, ratings): +assert isinstance(ratings, RDD), "ratings should be RDD" +first = ratings.first() +if not isinstance(first, Rating): +if isinstance(first, (tuple, list)): +ratings = ratings.map(lambda x: Rating(*x)) +else: +raise ValueError("rating should be RDD of Rating or tuple/list") +# serialize them by AutoBatchedSerializer before cache to reduce the +# objects overhead in JVM +cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache() --- End diff -- @mengxr Any comments about it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17686849 --- Diff: python/pyspark/mllib/recommendation.py --- @@ -54,34 +64,51 @@ def __del__(self): def predict(self, user, product): return self._java_model.predict(user, product) -def predictAll(self, usersProducts): -usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple) -return RDD(self._java_model.predict(usersProductsJRDD._jrdd), - self._context, RatingDeserializer()) +def predictAll(self, user_product): +assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)" +sc = self._context +tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd()) +jresult = self._java_model.predict(tuplerdd).toJavaRDD() +return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc, + AutoBatchedSerializer(PickleSerializer())) class ALS(object): @classmethod +def _prepare(cls, ratings): +assert isinstance(ratings, RDD), "ratings should be RDD" +first = ratings.first() +if not isinstance(first, Rating): +if isinstance(first, (tuple, list)): +ratings = ratings.map(lambda x: Rating(*x)) +else: +raise ValueError("rating should be RDD of Rating or tuple/list") +# serialize them by AutoBatchedSerializer before cache to reduce the +# objects overhead in JVM +cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache() --- End diff -- I'm not sure, caching will be better for small datasets, but worse for large dataset (may cause OOM someway). In this PR, it will keep the same behavior as before if I'm not sure to deal with it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user staple commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17686208 --- Diff: python/pyspark/mllib/recommendation.py --- @@ -54,34 +64,51 @@ def __del__(self): def predict(self, user, product): return self._java_model.predict(user, product) -def predictAll(self, usersProducts): -usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple) -return RDD(self._java_model.predict(usersProductsJRDD._jrdd), - self._context, RatingDeserializer()) +def predictAll(self, user_product): +assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)" +sc = self._context +tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd()) +jresult = self._java_model.predict(tuplerdd).toJavaRDD() +return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc, + AutoBatchedSerializer(PickleSerializer())) class ALS(object): @classmethod +def _prepare(cls, ratings): +assert isinstance(ratings, RDD), "ratings should be RDD" +first = ratings.first() +if not isinstance(first, Rating): +if isinstance(first, (tuple, list)): +ratings = ratings.map(lambda x: Rating(*x)) +else: +raise ValueError("rating should be RDD of Rating or tuple/list") +# serialize them by AutoBatchedSerializer before cache to reduce the +# objects overhead in JVM +cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache() --- End diff -- Hi, just wanted to check on your decision to cache for ALS. It looks like in ALS the makeLinkRDDs calls handle persistence for a transformation of the input data. Though there are two calls to makeLinkRDDs, so perhaps two reads of the input data. Are those two reads the reason for caching here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-55916761 @mengxr it's ready to review now, thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-55860805 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20453/consoleFull) for PR 2378 at commit [`e431377`](https://github.com/apache/spark/commit/e431377170172571974aadcae7ff42d3a79e2cd9). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-55860743 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/122/consoleFull) for PR 2378 at commit [`e431377`](https://github.com/apache/spark/commit/e431377170172571974aadcae7ff42d3a79e2cd9). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-55855377 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20453/consoleFull) for PR 2378 at commit [`e431377`](https://github.com/apache/spark/commit/e431377170172571974aadcae7ff42d3a79e2cd9). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2378#issuecomment-55855348 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/122/consoleFull) for PR 2378 at commit [`e431377`](https://github.com/apache/spark/commit/e431377170172571974aadcae7ff42d3a79e2cd9). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org