[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread mengxr
Github user mengxr commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-157154553
  
test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/9674#discussion_r44988807
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala ---
@@ -200,4 +235,121 @@ class PipelineModel private[ml] (
   override def copy(extra: ParamMap): PipelineModel = {
 new PipelineModel(uid, stages.map(_.copy(extra))).setParent(parent)
   }
+
+  override def write: Writer = new PipelineModelWriter(this)
+}
+
+object PipelineModel extends Readable[PipelineModel] {
+
+  override def read: Reader[PipelineModel] = new PipelineModelReader
+
+  override def load(path: String): PipelineModel = read.load(path)
+}
+
+private[ml] class PipelineModelWriter(instance: PipelineModel) extends 
Writer {
+
+  
PipelineSharedWriter.validateStages(instance.stages.asInstanceOf[Array[PipelineStage]])
+
+  override protected def saveImpl(path: String): Unit = 
PipelineSharedWriter.saveImpl(instance,
+instance.stages.asInstanceOf[Array[PipelineStage]], sc, path)
+}
+
+private[ml] class PipelineModelReader extends Reader[PipelineModel] {
+
+  /** Checked against metadata when loading model */
+  private val className = "org.apache.spark.ml.PipelineModel"
+
+  override def load(path: String): PipelineModel = {
+val (uid: String, stages: Array[PipelineStage]) =
+  PipelineSharedReader.load(className, sc, path)
+val transformers = stages map {
+  case stage: Transformer => stage
+  case stage => throw new RuntimeException(s"PipelineModel.read loaded 
a stage but found it" +
+s" was not a Transformer.  Bad stage: ${stage.uid}")
+}
+new PipelineModel(uid, transformers)
+  }
+}
+
+/** Methods for [[Writer]] shared between [[Pipeline]] and 
[[PipelineModel]] */
+private[ml] object PipelineSharedWriter {
+
+  import org.json4s.JsonDSL._
+
+  /** Check that all stages are Writable */
+  def validateStages(stages: Array[PipelineStage]): Unit = {
+stages.foreach {
+  case stage: Writable => // good
+  case stage =>
+throw new UnsupportedOperationException("Pipeline write will fail 
on this Pipeline" +
--- End diff --

But a user could write:
```
val writer = pipeline.write   // failure will occur here, before attempting 
to write
writer.save(...)
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/9674#discussion_r44990091
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala ---
@@ -200,4 +235,121 @@ class PipelineModel private[ml] (
   override def copy(extra: ParamMap): PipelineModel = {
 new PipelineModel(uid, stages.map(_.copy(extra))).setParent(parent)
   }
+
+  override def write: Writer = new PipelineModelWriter(this)
+}
+
+object PipelineModel extends Readable[PipelineModel] {
+
+  override def read: Reader[PipelineModel] = new PipelineModelReader
+
+  override def load(path: String): PipelineModel = read.load(path)
+}
+
+private[ml] class PipelineModelWriter(instance: PipelineModel) extends 
Writer {
+
+  
PipelineSharedWriter.validateStages(instance.stages.asInstanceOf[Array[PipelineStage]])
+
+  override protected def saveImpl(path: String): Unit = 
PipelineSharedWriter.saveImpl(instance,
+instance.stages.asInstanceOf[Array[PipelineStage]], sc, path)
+}
+
+private[ml] class PipelineModelReader extends Reader[PipelineModel] {
+
+  /** Checked against metadata when loading model */
+  private val className = "org.apache.spark.ml.PipelineModel"
+
+  override def load(path: String): PipelineModel = {
+val (uid: String, stages: Array[PipelineStage]) =
+  PipelineSharedReader.load(className, sc, path)
+val transformers = stages map {
+  case stage: Transformer => stage
+  case stage => throw new RuntimeException(s"PipelineModel.read loaded 
a stage but found it" +
+s" was not a Transformer.  Bad stage: ${stage.uid}")
+}
+new PipelineModel(uid, transformers)
+  }
+}
+
+/** Methods for [[Writer]] shared between [[Pipeline]] and 
[[PipelineModel]] */
+private[ml] object PipelineSharedWriter {
+
+  import org.json4s.JsonDSL._
+
+  /** Check that all stages are Writable */
+  def validateStages(stages: Array[PipelineStage]): Unit = {
+stages.foreach {
+  case stage: Writable => // good
+  case stage =>
+throw new UnsupportedOperationException("Pipeline write will fail 
on this Pipeline" +
+  s" because it contains a stage which does not implement 
Writable. Non-Writable stage:" +
+  s" ${stage.uid}")
+}
+  }
+
+  def saveImpl(
+  instance: Params,
+  stages: Array[PipelineStage],
+  sc: SparkContext,
+  path: String): Unit = {
+// Copied and edited from DefaultParamsWriter.saveMetadata
+// TODO: modify DefaultParamsWriter.saveMetadata to avoid duplication
+val uid = instance.uid
+val cls = instance.getClass.getName
+val stageUids = stages.map(_.uid)
+val jsonParams = List("stageUids" -> 
parse(compact(render(stageUids.toSeq
--- End diff --

Is another way than this better?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/9674#discussion_r44987093
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala ---
@@ -200,4 +235,121 @@ class PipelineModel private[ml] (
   override def copy(extra: ParamMap): PipelineModel = {
 new PipelineModel(uid, stages.map(_.copy(extra))).setParent(parent)
   }
+
+  override def write: Writer = new PipelineModelWriter(this)
+}
+
+object PipelineModel extends Readable[PipelineModel] {
+
+  override def read: Reader[PipelineModel] = new PipelineModelReader
+
+  override def load(path: String): PipelineModel = read.load(path)
+}
+
+private[ml] class PipelineModelWriter(instance: PipelineModel) extends 
Writer {
+
+  
PipelineSharedWriter.validateStages(instance.stages.asInstanceOf[Array[PipelineStage]])
+
+  override protected def saveImpl(path: String): Unit = 
PipelineSharedWriter.saveImpl(instance,
+instance.stages.asInstanceOf[Array[PipelineStage]], sc, path)
+}
+
+private[ml] class PipelineModelReader extends Reader[PipelineModel] {
+
+  /** Checked against metadata when loading model */
+  private val className = "org.apache.spark.ml.PipelineModel"
+
+  override def load(path: String): PipelineModel = {
+val (uid: String, stages: Array[PipelineStage]) =
+  PipelineSharedReader.load(className, sc, path)
+val transformers = stages map {
+  case stage: Transformer => stage
+  case stage => throw new RuntimeException(s"PipelineModel.read loaded 
a stage but found it" +
+s" was not a Transformer.  Bad stage: ${stage.uid}")
+}
+new PipelineModel(uid, transformers)
+  }
+}
+
+/** Methods for [[Writer]] shared between [[Pipeline]] and 
[[PipelineModel]] */
+private[ml] object PipelineSharedWriter {
+
+  import org.json4s.JsonDSL._
+
+  /** Check that all stages are Writable */
+  def validateStages(stages: Array[PipelineStage]): Unit = {
+stages.foreach {
+  case stage: Writable => // good
+  case stage =>
+throw new UnsupportedOperationException("Pipeline write will fail 
on this Pipeline" +
+  s" because it contains a stage which does not implement 
Writable. Non-Writable stage:" +
+  s" ${stage.uid}")
+}
+  }
+
+  def saveImpl(
+  instance: Params,
+  stages: Array[PipelineStage],
+  sc: SparkContext,
+  path: String): Unit = {
+// Copied and edited from DefaultParamsWriter.saveMetadata
+// TODO: modify DefaultParamsWriter.saveMetadata to avoid duplication
+val uid = instance.uid
+val cls = instance.getClass.getName
+val stageUids = stages.map(_.uid)
+val jsonParams = List("stageUids" -> 
parse(compact(render(stageUids.toSeq
+val metadata = ("class" -> cls) ~
+  ("timestamp" -> System.currentTimeMillis()) ~
+  ("sparkVersion" -> sc.version) ~
+  ("uid" -> uid) ~
+  ("paramMap" -> jsonParams)
+val metadataPath = new Path(path, "metadata").toString
+val metadataJson = compact(render(metadata))
+sc.parallelize(Seq(metadataJson), 1).saveAsTextFile(metadataPath)
+
+// Save stages
+val stagesDir = new Path(path, "stages").toString
+stages.foreach {
+  case stage: Writable =>
+val stagePath = new Path(stagesDir, stage.uid).toString
--- End diff --

It is useful if we prefix the stagePath with index, e.g., 
`0_Tokeninzer_123/, 1_LogisticRegression_456/`. This helps manual inspection.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/9674#discussion_r44987110
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala ---
@@ -200,4 +235,121 @@ class PipelineModel private[ml] (
   override def copy(extra: ParamMap): PipelineModel = {
 new PipelineModel(uid, stages.map(_.copy(extra))).setParent(parent)
   }
+
+  override def write: Writer = new PipelineModelWriter(this)
+}
+
+object PipelineModel extends Readable[PipelineModel] {
+
+  override def read: Reader[PipelineModel] = new PipelineModelReader
+
+  override def load(path: String): PipelineModel = read.load(path)
+}
+
+private[ml] class PipelineModelWriter(instance: PipelineModel) extends 
Writer {
+
+  
PipelineSharedWriter.validateStages(instance.stages.asInstanceOf[Array[PipelineStage]])
+
+  override protected def saveImpl(path: String): Unit = 
PipelineSharedWriter.saveImpl(instance,
+instance.stages.asInstanceOf[Array[PipelineStage]], sc, path)
+}
+
+private[ml] class PipelineModelReader extends Reader[PipelineModel] {
+
+  /** Checked against metadata when loading model */
+  private val className = "org.apache.spark.ml.PipelineModel"
+
+  override def load(path: String): PipelineModel = {
+val (uid: String, stages: Array[PipelineStage]) =
+  PipelineSharedReader.load(className, sc, path)
+val transformers = stages map {
+  case stage: Transformer => stage
+  case stage => throw new RuntimeException(s"PipelineModel.read loaded 
a stage but found it" +
+s" was not a Transformer.  Bad stage: ${stage.uid}")
+}
+new PipelineModel(uid, transformers)
+  }
+}
+
+/** Methods for [[Writer]] shared between [[Pipeline]] and 
[[PipelineModel]] */
+private[ml] object PipelineSharedWriter {
+
+  import org.json4s.JsonDSL._
+
+  /** Check that all stages are Writable */
+  def validateStages(stages: Array[PipelineStage]): Unit = {
+stages.foreach {
+  case stage: Writable => // good
+  case stage =>
+throw new UnsupportedOperationException("Pipeline write will fail 
on this Pipeline" +
+  s" because it contains a stage which does not implement 
Writable. Non-Writable stage:" +
+  s" ${stage.uid}")
+}
+  }
+
+  def saveImpl(
+  instance: Params,
+  stages: Array[PipelineStage],
+  sc: SparkContext,
+  path: String): Unit = {
+// Copied and edited from DefaultParamsWriter.saveMetadata
+// TODO: modify DefaultParamsWriter.saveMetadata to avoid duplication
+val uid = instance.uid
+val cls = instance.getClass.getName
+val stageUids = stages.map(_.uid)
+val jsonParams = List("stageUids" -> 
parse(compact(render(stageUids.toSeq
+val metadata = ("class" -> cls) ~
+  ("timestamp" -> System.currentTimeMillis()) ~
+  ("sparkVersion" -> sc.version) ~
+  ("uid" -> uid) ~
+  ("paramMap" -> jsonParams)
+val metadataPath = new Path(path, "metadata").toString
+val metadataJson = compact(render(metadata))
+sc.parallelize(Seq(metadataJson), 1).saveAsTextFile(metadataPath)
+
+// Save stages
+val stagesDir = new Path(path, "stages").toString
+stages.foreach {
+  case stage: Writable =>
+val stagePath = new Path(stagesDir, stage.uid).toString
+stage.write.save(stagePath)
+}
+  }
+}
+
+/** Methods for [[Reader]] shared between [[Pipeline]] and 
[[PipelineModel]] */
+private[ml] object PipelineSharedReader {
+
+  def load(className: String, sc: SparkContext, path: String): (String, 
Array[PipelineStage]) = {
+val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
+
+implicit val format = DefaultFormats
+val stagesDir = new Path(path, "stages").toString
+val stageUids: Array[String] = metadata.params match {
+  case JObject(pairs) =>
+if (pairs.length != 1) {
+  // Should not happen unless file is corrupted or we have a bug.
+  throw new RuntimeException(
+s"Pipeline read expected 1 Param (stageUids), but found 
${pairs.length}.")
+}
+pairs.head match {
+  case ("stageUids", jsonValue) =>
+parse(compact(render(jsonValue))).extract[Seq[String]].toArray
--- End diff --

Would `jsonValue.extract[Seq[String]].toArray` work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at 

[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/9674#discussion_r44987081
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala ---
@@ -200,4 +235,121 @@ class PipelineModel private[ml] (
   override def copy(extra: ParamMap): PipelineModel = {
 new PipelineModel(uid, stages.map(_.copy(extra))).setParent(parent)
   }
+
+  override def write: Writer = new PipelineModelWriter(this)
+}
+
+object PipelineModel extends Readable[PipelineModel] {
+
+  override def read: Reader[PipelineModel] = new PipelineModelReader
+
+  override def load(path: String): PipelineModel = read.load(path)
+}
+
+private[ml] class PipelineModelWriter(instance: PipelineModel) extends 
Writer {
+
+  
PipelineSharedWriter.validateStages(instance.stages.asInstanceOf[Array[PipelineStage]])
+
+  override protected def saveImpl(path: String): Unit = 
PipelineSharedWriter.saveImpl(instance,
+instance.stages.asInstanceOf[Array[PipelineStage]], sc, path)
+}
+
+private[ml] class PipelineModelReader extends Reader[PipelineModel] {
+
+  /** Checked against metadata when loading model */
+  private val className = "org.apache.spark.ml.PipelineModel"
+
+  override def load(path: String): PipelineModel = {
+val (uid: String, stages: Array[PipelineStage]) =
+  PipelineSharedReader.load(className, sc, path)
+val transformers = stages map {
+  case stage: Transformer => stage
+  case stage => throw new RuntimeException(s"PipelineModel.read loaded 
a stage but found it" +
+s" was not a Transformer.  Bad stage: ${stage.uid}")
+}
+new PipelineModel(uid, transformers)
+  }
+}
+
+/** Methods for [[Writer]] shared between [[Pipeline]] and 
[[PipelineModel]] */
+private[ml] object PipelineSharedWriter {
+
+  import org.json4s.JsonDSL._
+
+  /** Check that all stages are Writable */
+  def validateStages(stages: Array[PipelineStage]): Unit = {
+stages.foreach {
+  case stage: Writable => // good
+  case stage =>
+throw new UnsupportedOperationException("Pipeline write will fail 
on this Pipeline" +
+  s" because it contains a stage which does not implement 
Writable. Non-Writable stage:" +
+  s" ${stage.uid}")
+}
+  }
+
+  def saveImpl(
--- End diff --

minor: `saveImpl` -> `save`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/9674#discussion_r44987116
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala ---
@@ -200,4 +235,121 @@ class PipelineModel private[ml] (
   override def copy(extra: ParamMap): PipelineModel = {
 new PipelineModel(uid, stages.map(_.copy(extra))).setParent(parent)
   }
+
+  override def write: Writer = new PipelineModelWriter(this)
+}
+
+object PipelineModel extends Readable[PipelineModel] {
+
+  override def read: Reader[PipelineModel] = new PipelineModelReader
+
+  override def load(path: String): PipelineModel = read.load(path)
+}
+
+private[ml] class PipelineModelWriter(instance: PipelineModel) extends 
Writer {
+
+  
PipelineSharedWriter.validateStages(instance.stages.asInstanceOf[Array[PipelineStage]])
+
+  override protected def saveImpl(path: String): Unit = 
PipelineSharedWriter.saveImpl(instance,
+instance.stages.asInstanceOf[Array[PipelineStage]], sc, path)
+}
+
+private[ml] class PipelineModelReader extends Reader[PipelineModel] {
+
+  /** Checked against metadata when loading model */
+  private val className = "org.apache.spark.ml.PipelineModel"
+
+  override def load(path: String): PipelineModel = {
+val (uid: String, stages: Array[PipelineStage]) =
+  PipelineSharedReader.load(className, sc, path)
+val transformers = stages map {
+  case stage: Transformer => stage
+  case stage => throw new RuntimeException(s"PipelineModel.read loaded 
a stage but found it" +
+s" was not a Transformer.  Bad stage: ${stage.uid}")
+}
+new PipelineModel(uid, transformers)
+  }
+}
+
+/** Methods for [[Writer]] shared between [[Pipeline]] and 
[[PipelineModel]] */
+private[ml] object PipelineSharedWriter {
+
+  import org.json4s.JsonDSL._
+
+  /** Check that all stages are Writable */
+  def validateStages(stages: Array[PipelineStage]): Unit = {
+stages.foreach {
+  case stage: Writable => // good
+  case stage =>
+throw new UnsupportedOperationException("Pipeline write will fail 
on this Pipeline" +
+  s" because it contains a stage which does not implement 
Writable. Non-Writable stage:" +
+  s" ${stage.uid}")
+}
+  }
+
+  def saveImpl(
+  instance: Params,
+  stages: Array[PipelineStage],
+  sc: SparkContext,
+  path: String): Unit = {
+// Copied and edited from DefaultParamsWriter.saveMetadata
+// TODO: modify DefaultParamsWriter.saveMetadata to avoid duplication
+val uid = instance.uid
+val cls = instance.getClass.getName
+val stageUids = stages.map(_.uid)
+val jsonParams = List("stageUids" -> 
parse(compact(render(stageUids.toSeq
+val metadata = ("class" -> cls) ~
+  ("timestamp" -> System.currentTimeMillis()) ~
+  ("sparkVersion" -> sc.version) ~
+  ("uid" -> uid) ~
+  ("paramMap" -> jsonParams)
+val metadataPath = new Path(path, "metadata").toString
+val metadataJson = compact(render(metadata))
+sc.parallelize(Seq(metadataJson), 1).saveAsTextFile(metadataPath)
+
+// Save stages
+val stagesDir = new Path(path, "stages").toString
+stages.foreach {
+  case stage: Writable =>
+val stagePath = new Path(stagesDir, stage.uid).toString
+stage.write.save(stagePath)
+}
+  }
+}
+
+/** Methods for [[Reader]] shared between [[Pipeline]] and 
[[PipelineModel]] */
+private[ml] object PipelineSharedReader {
+
+  def load(className: String, sc: SparkContext, path: String): (String, 
Array[PipelineStage]) = {
+val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
+
+implicit val format = DefaultFormats
+val stagesDir = new Path(path, "stages").toString
+val stageUids: Array[String] = metadata.params match {
+  case JObject(pairs) =>
+if (pairs.length != 1) {
+  // Should not happen unless file is corrupted or we have a bug.
+  throw new RuntimeException(
+s"Pipeline read expected 1 Param (stageUids), but found 
${pairs.length}.")
+}
+pairs.head match {
+  case ("stageUids", jsonValue) =>
+parse(compact(render(jsonValue))).extract[Seq[String]].toArray
+  case (paramName, jsonValue) =>
+// Should not happen unless file is corrupted or we have a bug.
+throw new RuntimeException(s"Pipeline read encountered 
unexpected Param $paramName" +
+  s" in metadata: ${metadata.metadataStr}")
+}
+  case _ =>
+throw new 

[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/9674#discussion_r44987076
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala ---
@@ -200,4 +235,121 @@ class PipelineModel private[ml] (
   override def copy(extra: ParamMap): PipelineModel = {
 new PipelineModel(uid, stages.map(_.copy(extra))).setParent(parent)
   }
+
+  override def write: Writer = new PipelineModelWriter(this)
+}
+
+object PipelineModel extends Readable[PipelineModel] {
+
+  override def read: Reader[PipelineModel] = new PipelineModelReader
+
+  override def load(path: String): PipelineModel = read.load(path)
+}
+
+private[ml] class PipelineModelWriter(instance: PipelineModel) extends 
Writer {
+
+  
PipelineSharedWriter.validateStages(instance.stages.asInstanceOf[Array[PipelineStage]])
+
+  override protected def saveImpl(path: String): Unit = 
PipelineSharedWriter.saveImpl(instance,
+instance.stages.asInstanceOf[Array[PipelineStage]], sc, path)
+}
+
+private[ml] class PipelineModelReader extends Reader[PipelineModel] {
+
+  /** Checked against metadata when loading model */
+  private val className = "org.apache.spark.ml.PipelineModel"
+
+  override def load(path: String): PipelineModel = {
+val (uid: String, stages: Array[PipelineStage]) =
+  PipelineSharedReader.load(className, sc, path)
+val transformers = stages map {
+  case stage: Transformer => stage
+  case stage => throw new RuntimeException(s"PipelineModel.read loaded 
a stage but found it" +
+s" was not a Transformer.  Bad stage: ${stage.uid}")
+}
+new PipelineModel(uid, transformers)
+  }
+}
+
+/** Methods for [[Writer]] shared between [[Pipeline]] and 
[[PipelineModel]] */
+private[ml] object PipelineSharedWriter {
+
+  import org.json4s.JsonDSL._
+
+  /** Check that all stages are Writable */
+  def validateStages(stages: Array[PipelineStage]): Unit = {
+stages.foreach {
+  case stage: Writable => // good
+  case stage =>
+throw new UnsupportedOperationException("Pipeline write will fail 
on this Pipeline" +
+  s" because it contains a stage which does not implement 
Writable. Non-Writable stage:" +
+  s" ${stage.uid}")
--- End diff --

Include `stage.getClass`. Though the default implementation of `uid` 
contains class name, user can set arbitrary values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/9674#discussion_r44987089
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala ---
@@ -200,4 +235,121 @@ class PipelineModel private[ml] (
   override def copy(extra: ParamMap): PipelineModel = {
 new PipelineModel(uid, stages.map(_.copy(extra))).setParent(parent)
   }
+
+  override def write: Writer = new PipelineModelWriter(this)
+}
+
+object PipelineModel extends Readable[PipelineModel] {
+
+  override def read: Reader[PipelineModel] = new PipelineModelReader
+
+  override def load(path: String): PipelineModel = read.load(path)
+}
+
+private[ml] class PipelineModelWriter(instance: PipelineModel) extends 
Writer {
+
+  
PipelineSharedWriter.validateStages(instance.stages.asInstanceOf[Array[PipelineStage]])
+
+  override protected def saveImpl(path: String): Unit = 
PipelineSharedWriter.saveImpl(instance,
+instance.stages.asInstanceOf[Array[PipelineStage]], sc, path)
+}
+
+private[ml] class PipelineModelReader extends Reader[PipelineModel] {
+
+  /** Checked against metadata when loading model */
+  private val className = "org.apache.spark.ml.PipelineModel"
+
+  override def load(path: String): PipelineModel = {
+val (uid: String, stages: Array[PipelineStage]) =
+  PipelineSharedReader.load(className, sc, path)
+val transformers = stages map {
+  case stage: Transformer => stage
+  case stage => throw new RuntimeException(s"PipelineModel.read loaded 
a stage but found it" +
+s" was not a Transformer.  Bad stage: ${stage.uid}")
+}
+new PipelineModel(uid, transformers)
+  }
+}
+
+/** Methods for [[Writer]] shared between [[Pipeline]] and 
[[PipelineModel]] */
+private[ml] object PipelineSharedWriter {
+
+  import org.json4s.JsonDSL._
+
+  /** Check that all stages are Writable */
+  def validateStages(stages: Array[PipelineStage]): Unit = {
+stages.foreach {
+  case stage: Writable => // good
+  case stage =>
+throw new UnsupportedOperationException("Pipeline write will fail 
on this Pipeline" +
+  s" because it contains a stage which does not implement 
Writable. Non-Writable stage:" +
+  s" ${stage.uid}")
+}
+  }
+
+  def saveImpl(
+  instance: Params,
+  stages: Array[PipelineStage],
+  sc: SparkContext,
+  path: String): Unit = {
+// Copied and edited from DefaultParamsWriter.saveMetadata
+// TODO: modify DefaultParamsWriter.saveMetadata to avoid duplication
+val uid = instance.uid
+val cls = instance.getClass.getName
+val stageUids = stages.map(_.uid)
+val jsonParams = List("stageUids" -> 
parse(compact(render(stageUids.toSeq
--- End diff --

Pass `stageUids` directly if you define `stageUids` as 
`stages.map(_.uid).toSeq`. I guess it should work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/9674#discussion_r44987052
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala ---
@@ -200,4 +235,121 @@ class PipelineModel private[ml] (
   override def copy(extra: ParamMap): PipelineModel = {
 new PipelineModel(uid, stages.map(_.copy(extra))).setParent(parent)
   }
+
+  override def write: Writer = new PipelineModelWriter(this)
+}
+
+object PipelineModel extends Readable[PipelineModel] {
+
+  override def read: Reader[PipelineModel] = new PipelineModelReader
+
+  override def load(path: String): PipelineModel = read.load(path)
+}
+
+private[ml] class PipelineModelWriter(instance: PipelineModel) extends 
Writer {
+
+  
PipelineSharedWriter.validateStages(instance.stages.asInstanceOf[Array[PipelineStage]])
+
+  override protected def saveImpl(path: String): Unit = 
PipelineSharedWriter.saveImpl(instance,
+instance.stages.asInstanceOf[Array[PipelineStage]], sc, path)
+}
+
+private[ml] class PipelineModelReader extends Reader[PipelineModel] {
+
+  /** Checked against metadata when loading model */
+  private val className = "org.apache.spark.ml.PipelineModel"
+
+  override def load(path: String): PipelineModel = {
+val (uid: String, stages: Array[PipelineStage]) =
+  PipelineSharedReader.load(className, sc, path)
+val transformers = stages map {
+  case stage: Transformer => stage
+  case stage => throw new RuntimeException(s"PipelineModel.read loaded 
a stage but found it" +
+s" was not a Transformer.  Bad stage: ${stage.uid}")
--- End diff --

include `stage.getClass` in the error message


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/9674#discussion_r44987074
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala ---
@@ -200,4 +235,121 @@ class PipelineModel private[ml] (
   override def copy(extra: ParamMap): PipelineModel = {
 new PipelineModel(uid, stages.map(_.copy(extra))).setParent(parent)
   }
+
+  override def write: Writer = new PipelineModelWriter(this)
+}
+
+object PipelineModel extends Readable[PipelineModel] {
+
+  override def read: Reader[PipelineModel] = new PipelineModelReader
+
+  override def load(path: String): PipelineModel = read.load(path)
+}
+
+private[ml] class PipelineModelWriter(instance: PipelineModel) extends 
Writer {
+
+  
PipelineSharedWriter.validateStages(instance.stages.asInstanceOf[Array[PipelineStage]])
+
+  override protected def saveImpl(path: String): Unit = 
PipelineSharedWriter.saveImpl(instance,
+instance.stages.asInstanceOf[Array[PipelineStage]], sc, path)
+}
+
+private[ml] class PipelineModelReader extends Reader[PipelineModel] {
+
+  /** Checked against metadata when loading model */
+  private val className = "org.apache.spark.ml.PipelineModel"
+
+  override def load(path: String): PipelineModel = {
+val (uid: String, stages: Array[PipelineStage]) =
+  PipelineSharedReader.load(className, sc, path)
+val transformers = stages map {
+  case stage: Transformer => stage
+  case stage => throw new RuntimeException(s"PipelineModel.read loaded 
a stage but found it" +
+s" was not a Transformer.  Bad stage: ${stage.uid}")
+}
+new PipelineModel(uid, transformers)
+  }
+}
+
+/** Methods for [[Writer]] shared between [[Pipeline]] and 
[[PipelineModel]] */
+private[ml] object PipelineSharedWriter {
+
+  import org.json4s.JsonDSL._
+
+  /** Check that all stages are Writable */
+  def validateStages(stages: Array[PipelineStage]): Unit = {
+stages.foreach {
+  case stage: Writable => // good
+  case stage =>
+throw new UnsupportedOperationException("Pipeline write will fail 
on this Pipeline" +
--- End diff --

minor: `will fail` -> `failed`, `this Pipeline` -> `pipeline 
${pipeline.uid}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/9674#discussion_r44987064
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala ---
@@ -200,4 +235,121 @@ class PipelineModel private[ml] (
   override def copy(extra: ParamMap): PipelineModel = {
 new PipelineModel(uid, stages.map(_.copy(extra))).setParent(parent)
   }
+
+  override def write: Writer = new PipelineModelWriter(this)
+}
+
+object PipelineModel extends Readable[PipelineModel] {
+
+  override def read: Reader[PipelineModel] = new PipelineModelReader
+
+  override def load(path: String): PipelineModel = read.load(path)
+}
+
+private[ml] class PipelineModelWriter(instance: PipelineModel) extends 
Writer {
+
+  
PipelineSharedWriter.validateStages(instance.stages.asInstanceOf[Array[PipelineStage]])
+
+  override protected def saveImpl(path: String): Unit = 
PipelineSharedWriter.saveImpl(instance,
+instance.stages.asInstanceOf[Array[PipelineStage]], sc, path)
+}
+
+private[ml] class PipelineModelReader extends Reader[PipelineModel] {
+
+  /** Checked against metadata when loading model */
+  private val className = "org.apache.spark.ml.PipelineModel"
+
+  override def load(path: String): PipelineModel = {
+val (uid: String, stages: Array[PipelineStage]) =
+  PipelineSharedReader.load(className, sc, path)
+val transformers = stages map {
+  case stage: Transformer => stage
+  case stage => throw new RuntimeException(s"PipelineModel.read loaded 
a stage but found it" +
+s" was not a Transformer.  Bad stage: ${stage.uid}")
+}
+new PipelineModel(uid, transformers)
+  }
+}
+
+/** Methods for [[Writer]] shared between [[Pipeline]] and 
[[PipelineModel]] */
+private[ml] object PipelineSharedWriter {
+
+  import org.json4s.JsonDSL._
+
+  /** Check that all stages are Writable */
+  def validateStages(stages: Array[PipelineStage]): Unit = {
+stages.foreach {
+  case stage: Writable => // good
+  case stage =>
--- End diff --

minor: `other`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/9674#discussion_r44987039
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala ---
@@ -200,4 +235,121 @@ class PipelineModel private[ml] (
   override def copy(extra: ParamMap): PipelineModel = {
 new PipelineModel(uid, stages.map(_.copy(extra))).setParent(parent)
   }
+
+  override def write: Writer = new PipelineModelWriter(this)
+}
+
+object PipelineModel extends Readable[PipelineModel] {
+
+  override def read: Reader[PipelineModel] = new PipelineModelReader
+
+  override def load(path: String): PipelineModel = read.load(path)
+}
+
+private[ml] class PipelineModelWriter(instance: PipelineModel) extends 
Writer {
+
+  
PipelineSharedWriter.validateStages(instance.stages.asInstanceOf[Array[PipelineStage]])
+
+  override protected def saveImpl(path: String): Unit = 
PipelineSharedWriter.saveImpl(instance,
+instance.stages.asInstanceOf[Array[PipelineStage]], sc, path)
+}
+
+private[ml] class PipelineModelReader extends Reader[PipelineModel] {
+
+  /** Checked against metadata when loading model */
+  private val className = "org.apache.spark.ml.PipelineModel"
+
+  override def load(path: String): PipelineModel = {
+val (uid: String, stages: Array[PipelineStage]) =
+  PipelineSharedReader.load(className, sc, path)
+val transformers = stages map {
+  case stage: Transformer => stage
+  case stage => throw new RuntimeException(s"PipelineModel.read loaded 
a stage but found it" +
--- End diff --

minor: `stage` -> `other`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread mengxr
Github user mengxr commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-157179166
  
One suggestion is to merge `PipelineShardWriter` and `PipelineSharedReader` 
into a single object under `object Pipeline`, e.g., called `SharedReadWrite`. 
Then move `PipelineReader`, `PipelineWriter` to `object Pipeline`, and 
`PipelineModelReader` and `PipelineModelWriter` to `object PipelineModel`. The 
main purpose it to not pollute the package space in Java. Otherwise, they are 
all visible under `org.apache.spark.ml` in Java.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/9674#discussion_r44983864
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala ---
@@ -166,6 +173,34 @@ class Pipeline(override val uid: String) extends 
Estimator[PipelineModel] {
   "Cannot have duplicate components in a pipeline.")
 theStages.foldLeft(schema)((cur, stage) => stage.transformSchema(cur))
   }
+
+  override def write: Writer = new PipelineWriter(this)
+}
+
+object Pipeline extends Readable[Pipeline] {
+
+  override def read: Reader[Pipeline] = new PipelineReader
+
+  override def load(path: String): Pipeline = read.load(path)
+}
+
+private[ml] class PipelineWriter(instance: Pipeline) extends Writer {
+
+  PipelineSharedWriter.validateStages(instance.getStages)
--- End diff --

Should users be able to save an incomplete pipeline? For example, I could 
make a template pipeline, send it to other users, and they only need to fill in 
some required params like inputCol after they load it back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-157155290
  
**[Test build #46012 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46012/consoleFull)**
 for PR 9674 at commit 
[`caf57c2`](https://github.com/apache/spark/commit/caf57c2c48d90e3f4160626ca77a173260756cfe).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-157169733
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-157169560
  
**[Test build #46012 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46012/consoleFull)**
 for PR 9674 at commit 
[`caf57c2`](https://github.com/apache/spark/commit/caf57c2c48d90e3f4160626ca77a173260756cfe).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:\n  * 
`class Pipeline(override val uid: String) extends Estimator[PipelineModel] with 
Writable `\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-157169735
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/46012/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-157205145
  
**[Test build #46026 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46026/consoleFull)**
 for PR 9674 at commit 
[`f791010`](https://github.com/apache/spark/commit/f791010ec056ab9684329083e9bb63f142b3150c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-157224892
  
**[Test build #46026 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46026/consoleFull)**
 for PR 9674 at commit 
[`f791010`](https://github.com/apache/spark/commit/f791010ec056ab9684329083e9bb63f142b3150c).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:\n  * 
`class Pipeline(override val uid: String) extends Estimator[PipelineModel] with 
Writable `\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread mengxr
Github user mengxr commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-157205763
  
LGTM pending Jenkins.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-157225031
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/46026/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-157225029
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/9674#discussion_r44999160
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala ---
@@ -200,4 +235,121 @@ class PipelineModel private[ml] (
   override def copy(extra: ParamMap): PipelineModel = {
 new PipelineModel(uid, stages.map(_.copy(extra))).setParent(parent)
   }
+
+  override def write: Writer = new PipelineModelWriter(this)
+}
+
+object PipelineModel extends Readable[PipelineModel] {
+
+  override def read: Reader[PipelineModel] = new PipelineModelReader
+
+  override def load(path: String): PipelineModel = read.load(path)
+}
+
+private[ml] class PipelineModelWriter(instance: PipelineModel) extends 
Writer {
+
+  
PipelineSharedWriter.validateStages(instance.stages.asInstanceOf[Array[PipelineStage]])
+
+  override protected def saveImpl(path: String): Unit = 
PipelineSharedWriter.saveImpl(instance,
+instance.stages.asInstanceOf[Array[PipelineStage]], sc, path)
+}
+
+private[ml] class PipelineModelReader extends Reader[PipelineModel] {
+
+  /** Checked against metadata when loading model */
+  private val className = "org.apache.spark.ml.PipelineModel"
+
+  override def load(path: String): PipelineModel = {
+val (uid: String, stages: Array[PipelineStage]) =
+  PipelineSharedReader.load(className, sc, path)
+val transformers = stages map {
+  case stage: Transformer => stage
+  case stage => throw new RuntimeException(s"PipelineModel.read loaded 
a stage but found it" +
+s" was not a Transformer.  Bad stage: ${stage.uid}")
+}
+new PipelineModel(uid, transformers)
+  }
+}
+
+/** Methods for [[Writer]] shared between [[Pipeline]] and 
[[PipelineModel]] */
+private[ml] object PipelineSharedWriter {
+
+  import org.json4s.JsonDSL._
+
+  /** Check that all stages are Writable */
+  def validateStages(stages: Array[PipelineStage]): Unit = {
+stages.foreach {
+  case stage: Writable => // good
+  case stage =>
+throw new UnsupportedOperationException("Pipeline write will fail 
on this Pipeline" +
+  s" because it contains a stage which does not implement 
Writable. Non-Writable stage:" +
+  s" ${stage.uid}")
+}
+  }
+
+  def saveImpl(
--- End diff --

I like saveImpl since it's analogous to the other saveImpl methods which 
call it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread jkbradley
Github user jkbradley commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-157203769
  
@mengxr  Thanks for reviewing!  I believe I addressed everything, except 
where I quibbled in responses above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread jkbradley
Github user jkbradley commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-157227862
  
@mengxr Thank you for reviewing!  Merging with master and branch-1.6


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/9674


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-13 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-156537144
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45881/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-13 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-156537143
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-13 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-156568081
  
**[Test build #45897 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45897/consoleFull)**
 for PR 9674 at commit 
[`caf57c2`](https://github.com/apache/spark/commit/caf57c2c48d90e3f4160626ca77a173260756cfe).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-13 Thread mengxr
Github user mengxr commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-156563935
  
test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-13 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-156576715
  
**[Test build #45897 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45897/consoleFull)**
 for PR 9674 at commit 
[`caf57c2`](https://github.com/apache/spark/commit/caf57c2c48d90e3f4160626ca77a173260756cfe).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:\n  * 
`class Pipeline(override val uid: String) extends Estimator[PipelineModel] with 
Writable `\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-13 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-156577016
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-13 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-156577017
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45897/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-12 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-156221746
  
**[Test build #45764 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45764/consoleFull)**
 for PR 9674 at commit 
[`3700091`](https://github.com/apache/spark/commit/3700091c33005422a36e20f35019422b7aaaf813).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-12 Thread jkbradley
GitHub user jkbradley opened a pull request:

https://github.com/apache/spark/pull/9674

[SPARK-11612] [ML] Pipeline and PipelineModel persistence

Pipeline and PipelineModel extend Readable and Writable.  Persistence 
succeeds only when all stages are Writable.

Note: This PR reinstates tests for other read/write functionality.  It 
should probably not get merged until 
[https://issues.apache.org/jira/browse/SPARK-11672] gets fixed.

CC: @mengxr 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jkbradley/spark pipeline-io

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/9674.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #9674


commit f3c633fa87d9bb58b3836397b6dc3e6d94fc9104
Author: Joseph K. Bradley 
Date:   2015-11-10T21:16:40Z

added save/load to logreg in spark.ml

commit 375086b7040b8d1723cbd88d44173b87eb93c9d7
Author: Joseph K. Bradley 
Date:   2015-11-10T21:53:42Z

fixed read, write for logreg

commit bc4c838506ebaed123df38124c76d38c4fb501a4
Author: Joseph K. Bradley 
Date:   2015-11-11T18:56:44Z

added Pipeline save, load but not PipelineModel

commit bf3506f338f62b140874c692b126803884f87324
Author: Joseph K. Bradley 
Date:   2015-11-12T20:03:22Z

added PipelineModel save/load

commit 3700091c33005422a36e20f35019422b7aaaf813
Author: Joseph K. Bradley 
Date:   2015-11-12T20:08:16Z

reorder for Pipeline.scala classes




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-156221139
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-156221102
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-156235842
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-156235845
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45764/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11612] [ML] Pipeline and PipelineModel ...

2015-11-12 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/9674#issuecomment-156235482
  
**[Test build #45764 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45764/consoleFull)**
 for PR 9674 at commit 
[`3700091`](https://github.com/apache/spark/commit/3700091c33005422a36e20f35019422b7aaaf813).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:\n  * 
`class Pipeline(override val uid: String) extends Estimator[PipelineModel] with 
Writable `\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org