Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17574784
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala ---
    @@ -472,214 +452,140 @@ class PythonMLLibAPI extends Serializable {
           numRows: Long,
           numCols: Int,
           numPartitions: java.lang.Integer,
    -      seed: java.lang.Long): JavaRDD[Array[Byte]] = {
    +      seed: java.lang.Long): JavaRDD[Vector] = {
         val parts = getNumPartitionsOrDefault(numPartitions, jsc)
         val s = getSeedOrDefault(seed)
    -    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, 
s).map(SerDe.serializeDoubleVector)
    +    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s)
       }
     
     }
     
     /**
    - * :: DeveloperApi ::
    - * MultivariateStatisticalSummary with Vector fields serialized.
    + * SerDe utility functions for PythonMLLibAPI.
      */
    -@DeveloperApi
    -class MultivariateStatisticalSummarySerialized(val summary: 
MultivariateStatisticalSummary)
    -  extends Serializable {
    +private[spark] object SerDe extends Serializable {
     
    -  def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean)
    +  private[python] def reduce_object(out: OutputStream, pickler: Pickler,
    +                                    module: String, name: String, objects: 
Object*) = {
    +    out.write(Opcodes.GLOBAL)
    +    out.write((module + "\n" + name + "\n").getBytes)
    +    out.write(Opcodes.MARK)
    +    objects.foreach(pickler.save(_))
    +    out.write(Opcodes.TUPLE)
    +    out.write(Opcodes.REDUCE)
    +  }
     
    -  def variance: Array[Byte] = SerDe.serializeDoubleVector(summary.variance)
    +  private[python] class DenseVectorPickler extends IObjectPickler {
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val vector: DenseVector = obj.asInstanceOf[DenseVector]
    +      reduce_object(out, pickler, "pyspark.mllib.linalg", "DenseVector", 
vector.toArray)
    +    }
    +  }
     
    -  def count: Long = summary.count
    +  private[python] class DenseVectorConstructor extends IObjectConstructor {
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 1)
    +      new DenseVector(args(0).asInstanceOf[Array[Double]])
    +    }
    +  }
    +
    +  private[python] class DenseMatrixPickler extends IObjectPickler {
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val m: DenseMatrix = obj.asInstanceOf[DenseMatrix]
    +      reduce_object(out, pickler, "pyspark.mllib.linalg", "DenseMatrix",
    +        m.numRows.asInstanceOf[Object], m.numCols.asInstanceOf[Object], 
m.values)
    +    }
    +  }
     
    -  def numNonzeros: Array[Byte] = 
SerDe.serializeDoubleVector(summary.numNonzeros)
    +  private[python] class DenseMatrixConstructor extends IObjectConstructor {
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 3)
    +      new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
    +        args(2).asInstanceOf[Array[Double]])
    +    }
    +  }
     
    -  def max: Array[Byte] = SerDe.serializeDoubleVector(summary.max)
    +  private[python] class SparseVectorPickler extends IObjectPickler {
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val v: SparseVector = obj.asInstanceOf[SparseVector]
    +      reduce_object(out, pickler, "pyspark.mllib.linalg", "SparseVector",
    +        v.size.asInstanceOf[Object], v.indices, v.values)
    +    }
    +  }
     
    -  def min: Array[Byte] = SerDe.serializeDoubleVector(summary.min)
    -}
    +  private[python] class SparseVectorConstructor extends IObjectConstructor 
{
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 3)
    +      new SparseVector(args(0).asInstanceOf[Int], 
args(1).asInstanceOf[Array[Int]],
    +        args(2).asInstanceOf[Array[Double]])
    +    }
    +  }
     
    -/**
    - * SerDe utility functions for PythonMLLibAPI.
    - */
    -private[spark] object SerDe extends Serializable {
    -  private val DENSE_VECTOR_MAGIC: Byte = 1
    -  private val SPARSE_VECTOR_MAGIC: Byte = 2
    -  private val DENSE_MATRIX_MAGIC: Byte = 3
    -  private val LABELED_POINT_MAGIC: Byte = 4
    -
    -  private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: 
Int = 0): Vector = {
    -    require(bytes.length - offset >= 5, "Byte array too short")
    -    val magic = bytes(offset)
    -    if (magic == DENSE_VECTOR_MAGIC) {
    -      deserializeDenseVector(bytes, offset)
    -    } else if (magic == SPARSE_VECTOR_MAGIC) {
    -      deserializeSparseVector(bytes, offset)
    -    } else {
    -      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
    +  private[python] class LabeledPointPickler extends IObjectPickler {
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val point: LabeledPoint = obj.asInstanceOf[LabeledPoint]
    +      reduce_object(out, pickler, "pyspark.mllib.regression", 
"LabeledPoint",
    +        point.label.asInstanceOf[Object], point.features)
         }
       }
     
    -  private[python] def deserializeDouble(bytes: Array[Byte], offset: Int = 
0): Double = {
    -    require(bytes.length - offset == 8, "Wrong size byte array for Double")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.getDouble
    -  }
    -
    -  private[python] def deserializeDenseVector(bytes: Array[Byte], offset: 
Int = 0): Vector = {
    -    val packetLength = bytes.length - offset
    -    require(packetLength >= 5, "Byte array too short")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    require(magic == DENSE_VECTOR_MAGIC, "Invalid magic: " + magic)
    -    val length = bb.getInt()
    -    require (packetLength == 5 + 8 * length, "Invalid packet length: " + 
packetLength)
    -    val db = bb.asDoubleBuffer()
    -    val ans = new Array[Double](length.toInt)
    -    db.get(ans)
    -    Vectors.dense(ans)
    -  }
    -
    -  private[python] def deserializeSparseVector(bytes: Array[Byte], offset: 
Int = 0): Vector = {
    -    val packetLength = bytes.length - offset
    -    require(packetLength >= 9, "Byte array too short")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    require(magic == SPARSE_VECTOR_MAGIC, "Invalid magic: " + magic)
    -    val size = bb.getInt()
    -    val nonZeros = bb.getInt()
    -    require (packetLength == 9 + 12 * nonZeros, "Invalid packet length: " 
+ packetLength)
    -    val ib = bb.asIntBuffer()
    -    val indices = new Array[Int](nonZeros)
    -    ib.get(indices)
    -    bb.position(bb.position() + 4 * nonZeros)
    -    val db = bb.asDoubleBuffer()
    -    val values = new Array[Double](nonZeros)
    -    db.get(values)
    -    Vectors.sparse(size, indices, values)
    +  private[python] class LabeledPointConstructor extends IObjectConstructor 
{
    +    def construct(args: Array[Object]) :Object = {
    +      if (args.length != 2) {
    +        throw new PickleException("should be 2")
    +      }
    +      new LabeledPoint(args(0).asInstanceOf[Double], 
args(1).asInstanceOf[Vector])
    +    }
       }
     
       /**
    -   * Returns an 8-byte array for the input Double.
    -   *
    -   * Note: we currently do not use a magic byte for double for storage 
efficiency.
    -   * This should be reconsidered when we add Ser/De for other 8-byte types 
(e.g. Long), for safety.
    -   * The corresponding deserializer, deserializeDouble, needs to be 
modified as well if the
    -   * serialization scheme changes.
    +   * Pickle Rating
        */
    -  private[python] def serializeDouble(double: Double): Array[Byte] = {
    -    val bytes = new Array[Byte](8)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.putDouble(double)
    -    bytes
    -  }
    -
    -  private[python] def serializeDenseVector(doubles: Array[Double]): 
Array[Byte] = {
    -    val len = doubles.length
    -    val bytes = new Array[Byte](5 + 8 * len)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(DENSE_VECTOR_MAGIC)
    -    bb.putInt(len)
    -    val db = bb.asDoubleBuffer()
    -    db.put(doubles)
    -    bytes
    -  }
    -
    -  private[python] def serializeSparseVector(vector: SparseVector): 
Array[Byte] = {
    -    val nonZeros = vector.indices.length
    -    val bytes = new Array[Byte](9 + 12 * nonZeros)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(SPARSE_VECTOR_MAGIC)
    -    bb.putInt(vector.size)
    -    bb.putInt(nonZeros)
    -    val ib = bb.asIntBuffer()
    -    ib.put(vector.indices)
    -    bb.position(bb.position() + 4 * nonZeros)
    -    val db = bb.asDoubleBuffer()
    -    db.put(vector.values)
    -    bytes
    -  }
    -
    -  private[python] def serializeDoubleVector(vector: Vector): Array[Byte] = 
vector match {
    -    case s: SparseVector =>
    -      serializeSparseVector(s)
    -    case _ =>
    -      serializeDenseVector(vector.toArray)
    -  }
    -
    -  private[python] def deserializeDoubleMatrix(bytes: Array[Byte]): 
Array[Array[Double]] = {
    -    val packetLength = bytes.length
    -    if (packetLength < 9) {
    -      throw new IllegalArgumentException("Byte array too short.")
    +  private[python] class RatingPickler extends IObjectPickler {
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val rating: Rating = obj.asInstanceOf[Rating]
    +      reduce_object(out, pickler, "pyspark.mllib.recommendation", "Rating",
    +        rating.user.asInstanceOf[Object], 
rating.product.asInstanceOf[Object],
    +        rating.rating.asInstanceOf[Object])
         }
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    if (magic != DENSE_MATRIX_MAGIC) {
    -      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
    -    }
    -    val rows = bb.getInt()
    -    val cols = bb.getInt()
    -    if (packetLength != 9 + 8 * rows * cols) {
    -      throw new IllegalArgumentException("Size " + rows + "x" + cols + " 
is wrong.")
    -    }
    -    val db = bb.asDoubleBuffer()
    -    val ans = new Array[Array[Double]](rows.toInt)
    -    for (i <- 0 until rows.toInt) {
    -      ans(i) = new Array[Double](cols.toInt)
    -      db.get(ans(i))
    -    }
    -    ans
       }
     
    -  private[python] def serializeDoubleMatrix(doubles: 
Array[Array[Double]]): Array[Byte] = {
    -    val rows = doubles.length
    -    var cols = 0
    -    if (rows > 0) {
    -      cols = doubles(0).length
    -    }
    -    val bytes = new Array[Byte](9 + 8 * rows * cols)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(DENSE_MATRIX_MAGIC)
    -    bb.putInt(rows)
    -    bb.putInt(cols)
    -    val db = bb.asDoubleBuffer()
    -    for (i <- 0 until rows) {
    -      db.put(doubles(i))
    +  /**
    +   * Unpickle Rating
    +   */
    +  private[python] class RatingConstructor extends IObjectConstructor {
    +    def construct(args: Array[Object]) :Object = {
    +      if (args.length != 3) {
    +        throw new PickleException("should be 3")
    +      }
    +      new Rating(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
    +        args(2).asInstanceOf[Double])
         }
    -    bytes
       }
     
    -  private[python] def serializeLabeledPoint(p: LabeledPoint): Array[Byte] 
= {
    -    val fb = serializeDoubleVector(p.features)
    -    val bytes = new Array[Byte](1 + 8 + fb.length)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(LABELED_POINT_MAGIC)
    -    bb.putDouble(p.label)
    -    bb.put(fb)
    -    bytes
    +  def initialize() = {
    --- End diff --
    
    Please add return type explicitly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to