[SPARK-2024] Add saveAsSequenceFile to PySpark

JIRA issue: https://issues.apache.org/jira/browse/SPARK-2024

This PR is a followup to #455 and adds capabilities for saving PySpark RDDs 
using SequenceFile or any Hadoop OutputFormats.

* Added RDD methods ```saveAsSequenceFile```, ```saveAsHadoopFile``` and 
```saveAsHadoopDataset```, for both old and new MapReduce APIs.

* Default converter for converting common data types to Writables. Users may 
specify custom converters to convert to desired data types.

* No out-of-box support for reading/writing arrays, since ArrayWritable itself 
doesn't have a no-arg constructor for creating an empty instance upon reading. 
Users need to provide ArrayWritable subtypes. Custom converters for converting 
arrays to suitable ArrayWritable subtypes are also needed when writing. When 
reading, the default converter will convert any custom ArrayWritable subtypes 
to ```Object[]``` and they get pickled to Python tuples.

* Added HBase and Cassandra output examples to show how custom output formats 
and converters can be used.

cc MLnick mateiz ahirreddy pwendell

Author: Kan Zhang <kzh...@apache.org>

Closes #1338 from kanzhang/SPARK-2024 and squashes the following commits:

c01e3ef [Kan Zhang] [SPARK-2024] code formatting
6591e37 [Kan Zhang] [SPARK-2024] renaming pickled -> pickledRDD
d998ad6 [Kan Zhang] [SPARK-2024] refectoring to get method params below 10
57a7a5e [Kan Zhang] [SPARK-2024] correcting typo
75ca5bd [Kan Zhang] [SPARK-2024] Better type checking for batch serialized RDD
0bdec55 [Kan Zhang] [SPARK-2024] Refactoring newly added tests
9f39ff4 [Kan Zhang] [SPARK-2024] Adding 2 saveAsHadoopDataset tests
0c134f3 [Kan Zhang] [SPARK-2024] Test refactoring and adding couple unbatched 
cases
7a176df [Kan Zhang] [SPARK-2024] Add saveAsSequenceFile to PySpark


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94d1f46f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94d1f46f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94d1f46f

Branch: refs/heads/master
Commit: 94d1f46fc43c0cb85125f757fb40db9271caf1f4
Parents: 437dc8c
Author: Kan Zhang <kzh...@apache.org>
Authored: Wed Jul 30 13:19:05 2014 -0700
Committer: Josh Rosen <joshro...@apache.org>
Committed: Wed Jul 30 13:19:05 2014 -0700

----------------------------------------------------------------------
 .../spark/api/python/PythonHadoopUtil.scala     |  82 ++++-
 .../org/apache/spark/api/python/PythonRDD.scala | 247 +++++++++++----
 .../org/apache/spark/api/python/SerDeUtil.scala |  61 +++-
 .../WriteInputFormatTestDataGenerator.scala     |  69 +++-
 docs/programming-guide.md                       |  52 ++-
 .../src/main/python/cassandra_outputformat.py   |  83 +++++
 examples/src/main/python/hbase_inputformat.py   |   3 +-
 examples/src/main/python/hbase_outputformat.py  |  65 ++++
 .../pythonconverters/CassandraConverters.scala  |  24 +-
 .../pythonconverters/HBaseConverter.scala       |  33 --
 .../pythonconverters/HBaseConverters.scala      |  70 ++++
 python/pyspark/context.py                       |  51 ++-
 python/pyspark/rdd.py                           | 114 +++++++
 python/pyspark/tests.py                         | 317 ++++++++++++++++++-
 14 files changed, 1085 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index adaa1ef..f3b05e1 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.api.python
 
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SerializableWritable, SparkException}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io._
 import scala.util.{Failure, Success, Try}
@@ -31,13 +32,14 @@ import org.apache.spark.annotation.Experimental
  * transformation code by overriding the convert method.
  */
 @Experimental
-trait Converter[T, U] extends Serializable {
+trait Converter[T, + U] extends Serializable {
   def convert(obj: T): U
 }
 
 private[python] object Converter extends Logging {
 
-  def getInstance(converterClass: Option[String]): Converter[Any, Any] = {
+  def getInstance(converterClass: Option[String],
+                  defaultConverter: Converter[Any, Any]): Converter[Any, Any] 
= {
     converterClass.map { cc =>
       Try {
         val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, 
Any]]
@@ -49,7 +51,7 @@ private[python] object Converter extends Logging {
           logError(s"Failed to load converter: $cc")
           throw err
       }
-    }.getOrElse { new DefaultConverter }
+    }.getOrElse { defaultConverter }
   }
 }
 
@@ -57,7 +59,9 @@ private[python] object Converter extends Logging {
  * A converter that handles conversion of common 
[[org.apache.hadoop.io.Writable]] objects.
  * Other objects are passed through without conversion.
  */
-private[python] class DefaultConverter extends Converter[Any, Any] {
+private[python] class WritableToJavaConverter(
+    conf: Broadcast[SerializableWritable[Configuration]],
+    batchSize: Int) extends Converter[Any, Any] {
 
   /**
    * Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, 
String or
@@ -72,17 +76,30 @@ private[python] class DefaultConverter extends 
Converter[Any, Any] {
       case fw: FloatWritable => fw.get()
       case t: Text => t.toString
       case bw: BooleanWritable => bw.get()
-      case byw: BytesWritable => byw.getBytes
+      case byw: BytesWritable =>
+        val bytes = new Array[Byte](byw.getLength)
+        System.arraycopy(byw.getBytes(), 0, bytes, 0, byw.getLength)
+        bytes
       case n: NullWritable => null
-      case aw: ArrayWritable => aw.get().map(convertWritable(_))
-      case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) =>
-        (convertWritable(k), convertWritable(v))
-      }.toMap)
+      case aw: ArrayWritable =>
+        // Due to erasure, all arrays appear as Object[] and they get pickled 
to Python tuples.
+        // Since we can't determine element types for empty arrays, we will 
not attempt to
+        // convert to primitive arrays (which get pickled to Python arrays). 
Users may want
+        // write custom converters for arrays if they know the element types a 
priori.
+        aw.get().map(convertWritable(_))
+      case mw: MapWritable =>
+        val map = new java.util.HashMap[Any, Any]()
+        mw.foreach { case (k, v) =>
+          map.put(convertWritable(k), convertWritable(v))
+        }
+        map
+      case w: Writable =>
+        if (batchSize > 1) WritableUtils.clone(w, conf.value.value) else w
       case other => other
     }
   }
 
-  def convert(obj: Any): Any = {
+  override def convert(obj: Any): Any = {
     obj match {
       case writable: Writable =>
         convertWritable(writable)
@@ -92,6 +109,47 @@ private[python] class DefaultConverter extends 
Converter[Any, Any] {
   }
 }
 
+/**
+ * A converter that converts common types to 
[[org.apache.hadoop.io.Writable]]. Note that array
+ * types are not supported since the user needs to subclass 
[[org.apache.hadoop.io.ArrayWritable]]
+ * to set the type properly. See 
[[org.apache.spark.api.python.DoubleArrayWritable]] and
+ * [[org.apache.spark.api.python.DoubleArrayToWritableConverter]] for an 
example. They are used in
+ * PySpark RDD `saveAsNewAPIHadoopFile` doctest.
+ */
+private[python] class JavaToWritableConverter extends Converter[Any, Writable] 
{
+
+  /**
+   * Converts common data types to [[org.apache.hadoop.io.Writable]]. Note 
that array types are not
+   * supported out-of-the-box.
+   */
+  private def convertToWritable(obj: Any): Writable = {
+    import collection.JavaConversions._
+    obj match {
+      case i: java.lang.Integer => new IntWritable(i)
+      case d: java.lang.Double => new DoubleWritable(d)
+      case l: java.lang.Long => new LongWritable(l)
+      case f: java.lang.Float => new FloatWritable(f)
+      case s: java.lang.String => new Text(s)
+      case b: java.lang.Boolean => new BooleanWritable(b)
+      case aob: Array[Byte] => new BytesWritable(aob)
+      case null => NullWritable.get()
+      case map: java.util.Map[_, _] =>
+        val mapWritable = new MapWritable()
+        map.foreach { case (k, v) =>
+          mapWritable.put(convertToWritable(k), convertToWritable(v))
+        }
+        mapWritable
+      case other => throw new SparkException(
+        s"Data of type ${other.getClass.getName} cannot be used")
+    }
+  }
+
+  override def convert(obj: Any): Writable = obj match {
+    case writable: Writable => writable
+    case other => convertToWritable(other)
+  }
+}
+
 /** Utilities for working with Python objects <-> Hadoop-related objects */
 private[python] object PythonHadoopUtil {
 
@@ -118,7 +176,7 @@ private[python] object PythonHadoopUtil {
 
   /**
    * Converts an RDD of key-value pairs, where key and/or value could be 
instances of
-   * [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
+   * [[org.apache.hadoop.io.Writable]], into an RDD of base types, or vice 
versa.
    */
   def convertRDD[K, V](rdd: RDD[(K, V)],
                        keyConverter: Converter[Any, Any],

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index f551a59..a9d758b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -23,15 +23,18 @@ import java.nio.charset.Charset
 import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, 
Collections}
 
 import scala.collection.JavaConversions._
+import scala.language.existentials
 import scala.reflect.ClassTag
 import scala.util.Try
 
 import net.razorvine.pickle.{Pickler, Unpickler}
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.{InputFormat, JobConf}
-import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, 
OutputFormat => NewOutputFormat}
 import org.apache.spark._
+import org.apache.spark.SparkContext._
 import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
@@ -365,19 +368,17 @@ private[spark] object PythonRDD extends Logging {
       valueClassMaybeNull: String,
       keyConverterClass: String,
       valueConverterClass: String,
-      minSplits: Int) = {
+      minSplits: Int,
+      batchSize: Int) = {
     val keyClass = 
Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
     val valueClass = 
Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
-    implicit val kcm = 
ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
-    implicit val vcm = 
ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
-    val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
-    val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
-
+    val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
+    val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
     val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
-    val keyConverter = Converter.getInstance(Option(keyConverterClass))
-    val valueConverter = Converter.getInstance(Option(valueConverterClass))
-    val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, 
valueConverter)
-    JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+    val confBroadcasted = sc.sc.broadcast(new 
SerializableWritable(sc.hadoopConfiguration()))
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new WritableToJavaConverter(confBroadcasted, batchSize))
+    JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
   /**
@@ -394,17 +395,16 @@ private[spark] object PythonRDD extends Logging {
       valueClass: String,
       keyConverterClass: String,
       valueConverterClass: String,
-      confAsMap: java.util.HashMap[String, String]) = {
-    val conf = PythonHadoopUtil.mapToConf(confAsMap)
-    val baseConf = sc.hadoopConfiguration()
-    val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
+      confAsMap: java.util.HashMap[String, String],
+      batchSize: Int) = {
+    val mergedConf = getMergedConf(confAsMap, sc.hadoopConfiguration())
     val rdd =
       newAPIHadoopRDDFromClassNames[K, V, F](sc,
         Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
-    val keyConverter = Converter.getInstance(Option(keyConverterClass))
-    val valueConverter = Converter.getInstance(Option(valueConverterClass))
-    val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, 
valueConverter)
-    JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new WritableToJavaConverter(confBroadcasted, batchSize))
+    JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
   /**
@@ -421,15 +421,16 @@ private[spark] object PythonRDD extends Logging {
       valueClass: String,
       keyConverterClass: String,
       valueConverterClass: String,
-      confAsMap: java.util.HashMap[String, String]) = {
+      confAsMap: java.util.HashMap[String, String],
+      batchSize: Int) = {
     val conf = PythonHadoopUtil.mapToConf(confAsMap)
     val rdd =
       newAPIHadoopRDDFromClassNames[K, V, F](sc,
         None, inputFormatClass, keyClass, valueClass, conf)
-    val keyConverter = Converter.getInstance(Option(keyConverterClass))
-    val valueConverter = Converter.getInstance(Option(valueConverterClass))
-    val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, 
valueConverter)
-    JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new WritableToJavaConverter(confBroadcasted, batchSize))
+    JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
   private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](
@@ -439,18 +440,14 @@ private[spark] object PythonRDD extends Logging {
       keyClass: String,
       valueClass: String,
       conf: Configuration) = {
-    implicit val kcm = 
ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
-    implicit val vcm = 
ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
-    implicit val fcm = 
ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
-    val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
-    val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
-    val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
-    val rdd = if (path.isDefined) {
+    val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
+    val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
+    val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+    if (path.isDefined) {
       sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
     } else {
       sc.sc.newAPIHadoopRDD[K, V, F](conf, fc, kc, vc)
     }
-    rdd
   }
 
   /**
@@ -467,17 +464,16 @@ private[spark] object PythonRDD extends Logging {
       valueClass: String,
       keyConverterClass: String,
       valueConverterClass: String,
-      confAsMap: java.util.HashMap[String, String]) = {
-    val conf = PythonHadoopUtil.mapToConf(confAsMap)
-    val baseConf = sc.hadoopConfiguration()
-    val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
+      confAsMap: java.util.HashMap[String, String],
+      batchSize: Int) = {
+    val mergedConf = getMergedConf(confAsMap, sc.hadoopConfiguration())
     val rdd =
       hadoopRDDFromClassNames[K, V, F](sc,
         Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
-    val keyConverter = Converter.getInstance(Option(keyConverterClass))
-    val valueConverter = Converter.getInstance(Option(valueConverterClass))
-    val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, 
valueConverter)
-    JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new WritableToJavaConverter(confBroadcasted, batchSize))
+    JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
   /**
@@ -494,15 +490,16 @@ private[spark] object PythonRDD extends Logging {
       valueClass: String,
       keyConverterClass: String,
       valueConverterClass: String,
-      confAsMap: java.util.HashMap[String, String]) = {
+      confAsMap: java.util.HashMap[String, String],
+      batchSize: Int) = {
     val conf = PythonHadoopUtil.mapToConf(confAsMap)
     val rdd =
       hadoopRDDFromClassNames[K, V, F](sc,
         None, inputFormatClass, keyClass, valueClass, conf)
-    val keyConverter = Converter.getInstance(Option(keyConverterClass))
-    val valueConverter = Converter.getInstance(Option(valueConverterClass))
-    val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, 
valueConverter)
-    JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new WritableToJavaConverter(confBroadcasted, batchSize))
+    JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
   private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](
@@ -512,18 +509,14 @@ private[spark] object PythonRDD extends Logging {
       keyClass: String,
       valueClass: String,
       conf: Configuration) = {
-    implicit val kcm = 
ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
-    implicit val vcm = 
ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
-    implicit val fcm = 
ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
-    val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
-    val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
-    val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
-    val rdd = if (path.isDefined) {
+    val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
+    val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
+    val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+    if (path.isDefined) {
       sc.sc.hadoopFile(path.get, fc, kc, vc)
     } else {
       sc.sc.hadoopRDD(new JobConf(conf), fc, kc, vc)
     }
-    rdd
   }
 
   def writeUTF(str: String, dataOut: DataOutputStream) {
@@ -562,6 +555,152 @@ private[spark] object PythonRDD extends Logging {
     }
   }
 
+  private def getMergedConf(confAsMap: java.util.HashMap[String, String],
+      baseConf: Configuration): Configuration = {
+    val conf = PythonHadoopUtil.mapToConf(confAsMap)
+    PythonHadoopUtil.mergeConfs(baseConf, conf)
+  }
+
+  private def inferKeyValueTypes[K, V](rdd: RDD[(K, V)], keyConverterClass: 
String = null,
+      valueConverterClass: String = null): (Class[_], Class[_]) = {
+    // Peek at an element to figure out key/value types. Since Writables are 
not serializable,
+    // we cannot call first() on the converted RDD. Instead, we call first() 
on the original RDD
+    // and then convert locally.
+    val (key, value) = rdd.first()
+    val (kc, vc) = getKeyValueConverters(keyConverterClass, 
valueConverterClass,
+      new JavaToWritableConverter)
+    (kc.convert(key).getClass, vc.convert(value).getClass)
+  }
+
+  private def getKeyValueTypes(keyClass: String, valueClass: String):
+      Option[(Class[_], Class[_])] = {
+    for {
+      k <- Option(keyClass)
+      v <- Option(valueClass)
+    } yield (Class.forName(k), Class.forName(v))
+  }
+
+  private def getKeyValueConverters(keyConverterClass: String, 
valueConverterClass: String,
+      defaultConverter: Converter[Any, Any]): (Converter[Any, Any], 
Converter[Any, Any]) = {
+    val keyConverter = Converter.getInstance(Option(keyConverterClass), 
defaultConverter)
+    val valueConverter = Converter.getInstance(Option(valueConverterClass), 
defaultConverter)
+    (keyConverter, valueConverter)
+  }
+
+  /**
+   * Convert an RDD of key-value pairs from internal types to serializable 
types suitable for
+   * output, or vice versa.
+   */
+  private def convertRDD[K, V](rdd: RDD[(K, V)],
+                               keyConverterClass: String,
+                               valueConverterClass: String,
+                               defaultConverter: Converter[Any, Any]): 
RDD[(Any, Any)] = {
+    val (kc, vc) = getKeyValueConverters(keyConverterClass, 
valueConverterClass,
+      defaultConverter)
+    PythonHadoopUtil.convertRDD(rdd, kc, vc)
+  }
+
+  /**
+   * Output a Python RDD of key-value pairs as a Hadoop SequenceFile using the 
Writable types
+   * we convert from the RDD's key and value types. Note that keys and values 
can't be
+   * [[org.apache.hadoop.io.Writable]] types already, since Writables are not 
Java
+   * `Serializable` and we can't peek at them. The `path` can be on any Hadoop 
file system.
+   */
+  def saveAsSequenceFile[K, V, C <: CompressionCodec](
+      pyRDD: JavaRDD[Array[Byte]],
+      batchSerialized: Boolean,
+      path: String,
+      compressionCodecClass: String) = {
+    saveAsHadoopFile(
+      pyRDD, batchSerialized, path, 
"org.apache.hadoop.mapred.SequenceFileOutputFormat",
+      null, null, null, null, new java.util.HashMap(), compressionCodecClass)
+  }
+
+  /**
+   * Output a Python RDD of key-value pairs to any Hadoop file system, using 
old Hadoop
+   * `OutputFormat` in mapred package. Keys and values are converted to 
suitable output
+   * types using either user specified converters or, if not specified,
+   * [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion 
types
+   * `keyClass` and `valueClass` are automatically inferred if not specified. 
The passed-in
+   * `confAsMap` is merged with the default Hadoop conf associated with the 
SparkContext of
+   * this RDD.
+   */
+  def saveAsHadoopFile[K, V, F <: OutputFormat[_, _], C <: CompressionCodec](
+      pyRDD: JavaRDD[Array[Byte]],
+      batchSerialized: Boolean,
+      path: String,
+      outputFormatClass: String,
+      keyClass: String,
+      valueClass: String,
+      keyConverterClass: String,
+      valueConverterClass: String,
+      confAsMap: java.util.HashMap[String, String],
+      compressionCodecClass: String) = {
+    val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
+    val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
+      inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
+    val mergedConf = getMergedConf(confAsMap, 
pyRDD.context.hadoopConfiguration)
+    val codec = 
Option(compressionCodecClass).map(Class.forName(_).asInstanceOf[Class[C]])
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new JavaToWritableConverter)
+    val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+    converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), 
codec=codec)
+  }
+
+  /**
+   * Output a Python RDD of key-value pairs to any Hadoop file system, using 
new Hadoop
+   * `OutputFormat` in mapreduce package. Keys and values are converted to 
suitable output
+   * types using either user specified converters or, if not specified,
+   * [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion 
types
+   * `keyClass` and `valueClass` are automatically inferred if not specified. 
The passed-in
+   * `confAsMap` is merged with the default Hadoop conf associated with the 
SparkContext of
+   * this RDD.
+   */
+  def saveAsNewAPIHadoopFile[K, V, F <: NewOutputFormat[_, _]](
+      pyRDD: JavaRDD[Array[Byte]],
+      batchSerialized: Boolean,
+      path: String,
+      outputFormatClass: String,
+      keyClass: String,
+      valueClass: String,
+      keyConverterClass: String,
+      valueConverterClass: String,
+      confAsMap: java.util.HashMap[String, String]) = {
+    val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
+    val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
+      inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
+    val mergedConf = getMergedConf(confAsMap, 
pyRDD.context.hadoopConfiguration)
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new JavaToWritableConverter)
+    val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+    converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
+  }
+
+  /**
+   * Output a Python RDD of key-value pairs to any Hadoop file system, using a 
Hadoop conf
+   * converted from the passed-in `confAsMap`. The conf should set relevant 
output params (
+   * e.g., output path, output format, etc), in the same way as it would be 
configured for
+   * a Hadoop MapReduce job. Both old and new Hadoop OutputFormat APIs are 
supported
+   * (mapred vs. mapreduce). Keys/values are converted for output using either 
user specified
+   * converters or, by default, 
[[org.apache.spark.api.python.JavaToWritableConverter]].
+   */
+  def saveAsHadoopDataset[K, V](
+      pyRDD: JavaRDD[Array[Byte]],
+      batchSerialized: Boolean,
+      confAsMap: java.util.HashMap[String, String],
+      keyConverterClass: String,
+      valueConverterClass: String,
+      useNewAPI: Boolean) = {
+    val conf = PythonHadoopUtil.mapToConf(confAsMap)
+    val converted = convertRDD(SerDeUtil.pythonToPairRDD(pyRDD, 
batchSerialized),
+      keyConverterClass, valueConverterClass, new JavaToWritableConverter)
+    if (useNewAPI) {
+      converted.saveAsNewAPIHadoopDataset(conf)
+    } else {
+      converted.saveAsHadoopDataset(new JobConf(conf))
+    }
+  }
+
   /**
    * Convert and RDD of Java objects to and RDD of serialized Python objects, 
that is usable by
    * PySpark.

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala 
b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 9a012e7..efc9009 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -17,13 +17,14 @@
 
 package org.apache.spark.api.python
 
-import scala.util.Try
-import org.apache.spark.rdd.RDD
-import org.apache.spark.Logging
-import scala.util.Success
+import scala.collection.JavaConversions._
 import scala.util.Failure
-import net.razorvine.pickle.Pickler
+import scala.util.Try
 
+import net.razorvine.pickle.{Unpickler, Pickler}
+
+import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.rdd.RDD
 
 /** Utilities for serialization / deserialization between Python and Java, 
using Pickle. */
 private[python] object SerDeUtil extends Logging {
@@ -65,20 +66,52 @@ private[python] object SerDeUtil extends Logging {
    * by PySpark. By default, if serialization fails, toString is called and 
the string
    * representation is serialized
    */
-  def rddToPython(rdd: RDD[(Any, Any)]): RDD[Array[Byte]] = {
+  def pairRDDToPython(rdd: RDD[(Any, Any)], batchSize: Int): RDD[Array[Byte]] 
= {
     val (keyFailed, valueFailed) = checkPickle(rdd.first())
     rdd.mapPartitions { iter =>
       val pickle = new Pickler
-      iter.map { case (k, v) =>
-        if (keyFailed && valueFailed) {
-          pickle.dumps(Array(k.toString, v.toString))
-        } else if (keyFailed) {
-          pickle.dumps(Array(k.toString, v))
-        } else if (!keyFailed && valueFailed) {
-          pickle.dumps(Array(k, v.toString))
+      val cleaned = iter.map { case (k, v) =>
+        val key = if (keyFailed) k.toString else k
+        val value = if (valueFailed) v.toString else v
+        Array[Any](key, value)
+      }
+      if (batchSize > 1) {
+        cleaned.grouped(batchSize).map(batched => 
pickle.dumps(seqAsJavaList(batched)))
+      } else {
+        cleaned.map(pickle.dumps(_))
+      }
+    }
+  }
+
+  /**
+   * Convert an RDD of serialized Python tuple (K, V) to RDD[(K, V)].
+   */
+  def pythonToPairRDD[K, V](pyRDD: RDD[Array[Byte]], batchSerialized: 
Boolean): RDD[(K, V)] = {
+    def isPair(obj: Any): Boolean = {
+      
Option(obj.getClass.getComponentType).map(!_.isPrimitive).getOrElse(false) &&
+        obj.asInstanceOf[Array[_]].length == 2
+    }
+    pyRDD.mapPartitions { iter =>
+      val unpickle = new Unpickler
+      val unpickled =
+        if (batchSerialized) {
+          iter.flatMap { batch =>
+            unpickle.loads(batch) match {
+              case objs: java.util.List[_] => collectionAsScalaIterable(objs)
+              case other => throw new SparkException(
+                s"Unexpected type ${other.getClass.getName} for batch 
serialized Python RDD")
+            }
+          }
         } else {
-          pickle.dumps(Array(k, v))
+          iter.map(unpickle.loads(_))
         }
+      unpickled.map {
+        case obj if isPair(obj) =>
+          // we only accept (K, V)
+          val arr = obj.asInstanceOf[Array[_]]
+          (arr.head.asInstanceOf[K], arr.last.asInstanceOf[V])
+        case other => throw new SparkException(
+          s"RDD element of type ${other.getClass.getName} cannot be used")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
 
b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
index f0e3fb9..d11db97 100644
--- 
a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
+++ 
b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
@@ -17,15 +17,16 @@
 
 package org.apache.spark.api.python
 
-import org.apache.spark.SparkContext
-import org.apache.hadoop.io._
-import scala.Array
 import java.io.{DataOutput, DataInput}
+import java.nio.charset.Charset
+
+import org.apache.hadoop.io._
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
 import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.{SparkContext, SparkException}
 
 /**
- * A class to test MsgPack serialization on the Scala side, that will be 
deserialized
+ * A class to test Pyrolite serialization on the Scala side, that will be 
deserialized
  * in Python
  * @param str
  * @param int
@@ -54,7 +55,13 @@ case class TestWritable(var str: String, var int: Int, var 
double: Double) exten
   }
 }
 
-class TestConverter extends Converter[Any, Any] {
+private[python] class TestInputKeyConverter extends Converter[Any, Any] {
+  override def convert(obj: Any) = {
+    obj.asInstanceOf[IntWritable].get().toChar
+  }
+}
+
+private[python] class TestInputValueConverter extends Converter[Any, Any] {
   import collection.JavaConversions._
   override def convert(obj: Any) = {
     val m = obj.asInstanceOf[MapWritable]
@@ -62,6 +69,38 @@ class TestConverter extends Converter[Any, Any] {
   }
 }
 
+private[python] class TestOutputKeyConverter extends Converter[Any, Any] {
+  override def convert(obj: Any) = {
+    new Text(obj.asInstanceOf[Int].toString)
+  }
+}
+
+private[python] class TestOutputValueConverter extends Converter[Any, Any] {
+  import collection.JavaConversions._
+  override def convert(obj: Any) = {
+    new DoubleWritable(obj.asInstanceOf[java.util.Map[Double, 
_]].keySet().head)
+  }
+}
+
+private[python] class DoubleArrayWritable extends 
ArrayWritable(classOf[DoubleWritable])
+
+private[python] class DoubleArrayToWritableConverter extends Converter[Any, 
Writable] {
+  override def convert(obj: Any) = obj match {
+    case arr if arr.getClass.isArray && arr.getClass.getComponentType == 
classOf[Double] =>
+      val daw = new DoubleArrayWritable
+      daw.set(arr.asInstanceOf[Array[Double]].map(new DoubleWritable(_)))
+      daw
+    case other => throw new SparkException(s"Data of type $other is not 
supported")
+  }
+}
+
+private[python] class WritableToDoubleArrayConverter extends Converter[Any, 
Array[Double]] {
+  override def convert(obj: Any): Array[Double] = obj match {
+    case daw : DoubleArrayWritable => 
daw.get().map(_.asInstanceOf[DoubleWritable].get())
+    case other => throw new SparkException(s"Data of type $other is not 
supported")
+  }
+}
+
 /**
  * This object contains method to generate SequenceFile test data and write it 
to a
  * given directory (probably a temp directory)
@@ -97,7 +136,8 @@ object WriteInputFormatTestDataGenerator {
     sc.parallelize(intKeys).saveAsSequenceFile(intPath)
     sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) 
}).saveAsSequenceFile(doublePath)
     sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) 
}).saveAsSequenceFile(textPath)
-    sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes) 
}).saveAsSequenceFile(bytesPath)
+    sc.parallelize(intKeys.map{ case (k, v) => (k, 
v.getBytes(Charset.forName("UTF-8"))) }
+      ).saveAsSequenceFile(bytesPath)
     val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), 
(1, false))
     sc.parallelize(bools).saveAsSequenceFile(boolPath)
     sc.parallelize(intKeys).map{ case (k, v) =>
@@ -106,19 +146,20 @@ object WriteInputFormatTestDataGenerator {
 
     // Create test data for ArrayWritable
     val data = Seq(
-      (1, Array(1.0, 2.0, 3.0)),
+      (1, Array()),
       (2, Array(3.0, 4.0, 5.0)),
       (3, Array(4.0, 5.0, 6.0))
     )
     sc.parallelize(data, numSlices = 2)
       .map{ case (k, v) =>
-      (new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], 
v.map(new DoubleWritable(_))))
-    }.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, 
ArrayWritable]](arrPath)
+        val va = new DoubleArrayWritable
+        va.set(v.map(new DoubleWritable(_)))
+        (new IntWritable(k), va)
+    }.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, 
DoubleArrayWritable]](arrPath)
 
     // Create test data for MapWritable, with keys DoubleWritable and values 
Text
     val mapData = Seq(
-      (1, Map(2.0 -> "aa")),
-      (2, Map(3.0 -> "bb")),
+      (1, Map()),
       (2, Map(1.0 -> "cc")),
       (3, Map(2.0 -> "dd")),
       (2, Map(1.0 -> "aa")),
@@ -126,9 +167,9 @@ object WriteInputFormatTestDataGenerator {
     )
     sc.parallelize(mapData, numSlices = 2).map{ case (i, m) =>
       val mw = new MapWritable()
-      val k = m.keys.head
-      val v = m.values.head
-      mw.put(new DoubleWritable(k), new Text(v))
+      m.foreach { case (k, v) =>
+        mw.put(new DoubleWritable(k), new Text(v))
+      }
       (new IntWritable(i), mw)
     }.saveAsSequenceFile(mapPath)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/docs/programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 90c6971..a88bf27 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -383,16 +383,16 @@ Apart from text files, Spark's Python API also supports 
several other data forma
 
 * `RDD.saveAsPickleFile` and `SparkContext.pickleFile` support saving an RDD 
in a simple format consisting of pickled Python objects. Batching is used on 
pickle serialization, with default batch size 10.
 
-* Details on reading `SequenceFile` and arbitrary Hadoop `InputFormat` are 
given below.
-
-### SequenceFile and Hadoop InputFormats
+* SequenceFile and Hadoop Input/Output Formats
 
 **Note** this feature is currently marked ```Experimental``` and is intended 
for advanced users. It may be replaced in future with read/write support based 
on SparkSQL, in which case SparkSQL is the preferred approach.
 
-#### Writable Support
+**Writable Support**
 
-PySpark SequenceFile support loads an RDD within Java, and pickles the 
resulting Java objects using
-[Pyrolite](https://github.com/irmen/Pyrolite/). The following Writables are 
automatically converted:
+PySpark SequenceFile support loads an RDD of key-value pairs within Java, 
converts Writables to base Java types, and pickles the 
+resulting Java objects using [Pyrolite](https://github.com/irmen/Pyrolite/). 
When saving an RDD of key-value pairs to SequenceFile, 
+PySpark does the reverse. It unpickles Python objects into Java objects and 
then converts them to Writables. The following 
+Writables are automatically converted:
 
 <table class="table">
 <tr><th>Writable Type</th><th>Python Type</th></tr>
@@ -403,32 +403,30 @@ PySpark SequenceFile support loads an RDD within Java, 
and pickles the resulting
 <tr><td>BooleanWritable</td><td>bool</td></tr>
 <tr><td>BytesWritable</td><td>bytearray</td></tr>
 <tr><td>NullWritable</td><td>None</td></tr>
-<tr><td>ArrayWritable</td><td>list of primitives, or tuple of objects</td></tr>
 <tr><td>MapWritable</td><td>dict</td></tr>
-<tr><td>Custom Class conforming to Java Bean conventions</td>
-    <td>dict of public properties (via JavaBean getters and setters) + 
__class__ for the class type</td></tr>
 </table>
 
-#### Loading SequenceFiles
+Arrays are not handled out-of-the-box. Users need to specify custom 
`ArrayWritable` subtypes when reading or writing. When writing, 
+users also need to specify custom converters that convert arrays to custom 
`ArrayWritable` subtypes. When reading, the default 
+converter will convert custom `ArrayWritable` subtypes to Java `Object[]`, 
which then get pickled to Python tuples. To get 
+Python `array.array` for arrays of primitive types, users need to specify 
custom converters.
+
+**Saving and Loading SequenceFiles**
 
-Similarly to text files, SequenceFiles can be loaded by specifying the path. 
The key and value
+Similarly to text files, SequenceFiles can be saved and loaded by specifying 
the path. The key and value
 classes can be specified, but for standard Writables this is not required.
 
 {% highlight python %}
->>> rdd = sc.sequenceFile("path/to/sequencefile/of/doubles")
->>> rdd.collect()         # this example has DoubleWritable keys and Text 
values
-[(1.0, u'aa'),
- (2.0, u'bb'),
- (2.0, u'aa'),
- (3.0, u'cc'),
- (2.0, u'bb'),
- (1.0, u'aa')]
+>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+>>> rdd.saveAsSequenceFile("path/to/file")
+>>> sorted(sc.sequenceFile("path/to/file").collect())
+[(1, u'a'), (2, u'aa'), (3, u'aaa')]
 {% endhighlight %}
 
-#### Loading Other Hadoop InputFormats
+**Saving and Loading Other Hadoop Input/Output Formats**
 
-PySpark can also read any Hadoop InputFormat, for both 'new' and 'old' Hadoop 
APIs. If required,
-a Hadoop configuration can be passed in as a Python dict. Here is an example 
using the
+PySpark can also read any Hadoop InputFormat or write any Hadoop OutputFormat, 
for both 'new' and 'old' Hadoop MapReduce APIs. 
+If required, a Hadoop configuration can be passed in as a Python dict. Here is 
an example using the
 Elasticsearch ESInputFormat:
 
 {% highlight python %}
@@ -447,8 +445,7 @@ Note that, if the InputFormat simply depends on a Hadoop 
configuration and/or in
 the key and value classes can easily be converted according to the above table,
 then this approach should work well for such cases.
 
-If you have custom serialized binary data (such as loading data from Cassandra 
/ HBase) or custom
-classes that don't conform to the JavaBean requirements, then you will first 
need to 
+If you have custom serialized binary data (such as loading data from Cassandra 
/ HBase), then you will first need to 
 transform that data on the Scala/Java side to something which can be handled 
by Pyrolite's pickler.
 A [Converter](api/scala/index.html#org.apache.spark.api.python.Converter) 
trait is provided 
 for this. Simply extend this trait and implement your transformation code in 
the ```convert``` 
@@ -456,11 +453,8 @@ method. Remember to ensure that this class, along with any 
dependencies required
 classpath.
 
 See the [Python 
examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python) and 
-the [Converter 
examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/pythonconverters)
 
-for examples of using HBase and Cassandra ```InputFormat```.
-
-Future support for writing data out as ```SequenceFileOutputFormat``` and 
other ```OutputFormats```, 
-is forthcoming.
+the [Converter 
examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/pythonconverters)
 
+for examples of using Cassandra / HBase ```InputFormat``` and 
```OutputFormat``` with custom converters.
 
 </div>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/examples/src/main/python/cassandra_outputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/cassandra_outputformat.py 
b/examples/src/main/python/cassandra_outputformat.py
new file mode 100644
index 0000000..1dfbf98
--- /dev/null
+++ b/examples/src/main/python/cassandra_outputformat.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+
+"""
+Create data in Cassandra fist
+(following: https://wiki.apache.org/cassandra/GettingStarted)
+
+cqlsh> CREATE KEYSPACE test
+   ... WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 
1 };
+cqlsh> use test;
+cqlsh:test> CREATE TABLE users (
+        ...   user_id int PRIMARY KEY,
+        ...   fname text,
+        ...   lname text
+        ... );
+
+> cassandra_outputformat <host> test users 1745 john smith
+> cassandra_outputformat <host> test users 1744 john doe
+> cassandra_outputformat <host> test users 1746 john smith
+
+cqlsh:test> SELECT * FROM users;
+
+ user_id | fname | lname
+---------+-------+-------
+    1745 |  john | smith
+    1744 |  john |   doe
+    1746 |  john | smith
+"""
+if __name__ == "__main__":
+    if len(sys.argv) != 7:
+        print >> sys.stderr, """
+        Usage: cassandra_outputformat <host> <keyspace> <cf> <user_id> <fname> 
<lname>
+
+        Run with example jar:
+        ./bin/spark-submit --driver-class-path /path/to/example/jar 
/path/to/examples/cassandra_outputformat.py <args>
+        Assumes you have created the following table <cf> in Cassandra already,
+        running on <host>, in <keyspace>.
+
+        cqlsh:<keyspace>> CREATE TABLE <cf> (
+           ...   user_id int PRIMARY KEY,
+           ...   fname text,
+           ...   lname text
+           ... );
+        """
+        exit(-1)
+
+    host = sys.argv[1]
+    keyspace = sys.argv[2]
+    cf = sys.argv[3]
+    sc = SparkContext(appName="CassandraOutputFormat")
+
+    conf = {"cassandra.output.thrift.address":host,
+            "cassandra.output.thrift.port":"9160",
+            "cassandra.output.keyspace":keyspace,
+            "cassandra.output.partitioner.class":"Murmur3Partitioner",
+            "cassandra.output.cql":"UPDATE " + keyspace + "." + cf + " SET 
fname = ?, lname = ?",
+            "mapreduce.output.basename":cf,
+            
"mapreduce.outputformat.class":"org.apache.cassandra.hadoop.cql3.CqlOutputFormat",
+            "mapreduce.job.output.key.class":"java.util.Map",
+            "mapreduce.job.output.value.class":"java.util.List"}
+    key = {"user_id" : int(sys.argv[4])}
+    sc.parallelize([(key, sys.argv[5:])]).saveAsNewAPIHadoopDataset(
+        conf=conf,
+        
keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter",
+        
valueConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLValueConverter")

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/examples/src/main/python/hbase_inputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/hbase_inputformat.py 
b/examples/src/main/python/hbase_inputformat.py
index 3289d98..c9fa8e1 100644
--- a/examples/src/main/python/hbase_inputformat.py
+++ b/examples/src/main/python/hbase_inputformat.py
@@ -65,7 +65,8 @@ if __name__ == "__main__":
         "org.apache.hadoop.hbase.mapreduce.TableInputFormat",
         "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
         "org.apache.hadoop.hbase.client.Result",
-        
valueConverter="org.apache.spark.examples.pythonconverters.HBaseConverter",
+        
keyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter",
+        
valueConverter="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter",
         conf=conf)
     output = hbase_rdd.collect()
     for (k, v) in output:

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/examples/src/main/python/hbase_outputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/hbase_outputformat.py 
b/examples/src/main/python/hbase_outputformat.py
new file mode 100644
index 0000000..5e11548
--- /dev/null
+++ b/examples/src/main/python/hbase_outputformat.py
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+
+"""
+Create test table in HBase first:
+
+hbase(main):001:0> create 'test', 'f1'
+0 row(s) in 0.7840 seconds
+
+> hbase_outputformat <host> test row1 f1 q1 value1
+> hbase_outputformat <host> test row2 f1 q1 value2
+> hbase_outputformat <host> test row3 f1 q1 value3
+> hbase_outputformat <host> test row4 f1 q1 value4
+
+hbase(main):002:0> scan 'test'
+ROW                   COLUMN+CELL
+ row1                 column=f1:q1, timestamp=1405659615726, value=value1
+ row2                 column=f1:q1, timestamp=1405659626803, value=value2
+ row3                 column=f1:q1, timestamp=1405659640106, value=value3
+ row4                 column=f1:q1, timestamp=1405659650292, value=value4
+4 row(s) in 0.0780 seconds
+"""
+if __name__ == "__main__":
+    if len(sys.argv) != 7:
+        print >> sys.stderr, """
+        Usage: hbase_outputformat <host> <table> <row> <family> <qualifier> 
<value>
+
+        Run with example jar:
+        ./bin/spark-submit --driver-class-path /path/to/example/jar 
/path/to/examples/hbase_outputformat.py <args>
+        Assumes you have created <table> with column family <family> in HBase 
running on <host> already
+        """
+        exit(-1)
+
+    host = sys.argv[1]
+    table = sys.argv[2]
+    sc = SparkContext(appName="HBaseOutputFormat")
+
+    conf = {"hbase.zookeeper.quorum": host,
+            "hbase.mapred.outputtable": table,
+            "mapreduce.outputformat.class" : 
"org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
+            "mapreduce.job.output.key.class" : 
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
+            "mapreduce.job.output.value.class" : 
"org.apache.hadoop.io.Writable"}
+
+    sc.parallelize([sys.argv[3:]]).map(lambda x: (x[0], 
x)).saveAsNewAPIHadoopDataset(
+        conf=conf,
+        
keyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter",
+        
valueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter")

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
 
b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
index 29a65c7..83feb57 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
@@ -20,7 +20,7 @@ package org.apache.spark.examples.pythonconverters
 import org.apache.spark.api.python.Converter
 import java.nio.ByteBuffer
 import org.apache.cassandra.utils.ByteBufferUtil
-import collection.JavaConversions.{mapAsJavaMap, mapAsScalaMap}
+import collection.JavaConversions._
 
 
 /**
@@ -44,3 +44,25 @@ class CassandraCQLValueConverter extends Converter[Any, 
java.util.Map[String, St
     mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb)))
   }
 }
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * Map[String, Int] to Cassandra key
+ */
+class ToCassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, 
ByteBuffer]] {
+  override def convert(obj: Any): java.util.Map[String, ByteBuffer] = {
+    val input = obj.asInstanceOf[java.util.Map[String, Int]]
+    mapAsJavaMap(input.mapValues(i => ByteBufferUtil.bytes(i)))
+  }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * List[String] to Cassandra value
+ */
+class ToCassandraCQLValueConverter extends Converter[Any, 
java.util.List[ByteBuffer]] {
+  override def convert(obj: Any): java.util.List[ByteBuffer] = {
+    val input = obj.asInstanceOf[java.util.List[String]]
+    seqAsJavaList(input.map(s => ByteBufferUtil.bytes(s)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala
 
b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala
deleted file mode 100644
index 42ae960..0000000
--- 
a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.pythonconverters
-
-import org.apache.spark.api.python.Converter
-import org.apache.hadoop.hbase.client.Result
-import org.apache.hadoop.hbase.util.Bytes
-
-/**
- * Implementation of [[org.apache.spark.api.python.Converter]] that converts a 
HBase Result
- * to a String
- */
-class HBaseConverter extends Converter[Any, String] {
-  override def convert(obj: Any): String = {
-    val result = obj.asInstanceOf[Result]
-    Bytes.toStringBinary(result.value())
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
 
b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
new file mode 100644
index 0000000..273bee0
--- /dev/null
+++ 
b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.pythonconverters
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.api.python.Converter
+import org.apache.hadoop.hbase.client.{Put, Result}
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts an
+ * HBase Result to a String
+ */
+class HBaseResultToStringConverter extends Converter[Any, String] {
+  override def convert(obj: Any): String = {
+    val result = obj.asInstanceOf[Result]
+    Bytes.toStringBinary(result.value())
+  }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts an
+ * ImmutableBytesWritable to a String
+ */
+class ImmutableBytesWritableToStringConverter extends Converter[Any, String] {
+  override def convert(obj: Any): String = {
+    val key = obj.asInstanceOf[ImmutableBytesWritable]
+    Bytes.toStringBinary(key.get())
+  }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * String to an ImmutableBytesWritable
+ */
+class StringToImmutableBytesWritableConverter extends Converter[Any, 
ImmutableBytesWritable] {
+  override def convert(obj: Any): ImmutableBytesWritable = {
+    val bytes = Bytes.toBytes(obj.asInstanceOf[String])
+    new ImmutableBytesWritable(bytes)
+  }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * list of Strings to HBase Put
+ */
+class StringListToPutConverter extends Converter[Any, Put] {
+  override def convert(obj: Any): Put = {
+    val output = 
obj.asInstanceOf[java.util.ArrayList[String]].map(Bytes.toBytes(_)).toArray
+    val put = new Put(output(0))
+    put.add(output(1), output(2), output(3))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 830a6ee..7b0f8d8 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -60,6 +60,7 @@ class SparkContext(object):
     _active_spark_context = None
     _lock = Lock()
     _python_includes = None  # zip and egg files that need to be added to 
PYTHONPATH
+    _default_batch_size_for_serialized_input = 10
 
     def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
                  environment=None, batchSize=1024, 
serializer=PickleSerializer(), conf=None,
@@ -378,7 +379,7 @@ class SparkContext(object):
         return jm
 
     def sequenceFile(self, path, keyClass=None, valueClass=None, 
keyConverter=None,
-                     valueConverter=None, minSplits=None):
+                     valueConverter=None, minSplits=None, batchSize=None):
         """
         Read a Hadoop SequenceFile with arbitrary key and value Writable class 
from HDFS,
         a local file system (available on all nodes), or any Hadoop-supported 
file system URI.
@@ -398,14 +399,18 @@ class SparkContext(object):
         @param valueConverter:
         @param minSplits: minimum splits in dataset
                (default min(2, sc.defaultParallelism))
+        @param batchSize: The number of Python objects represented as a single
+               Java object. (default 
sc._default_batch_size_for_serialized_input)
         """
         minSplits = minSplits or min(self.defaultParallelism, 2)
+        batchSize = max(1, batchSize or 
self._default_batch_size_for_serialized_input)
+        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else 
PickleSerializer()
         jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, 
valueClass,
-                                                keyConverter, valueConverter, 
minSplits)
-        return RDD(jrdd, self, PickleSerializer())
+                    keyConverter, valueConverter, minSplits, batchSize)
+        return RDD(jrdd, self, ser)
 
     def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, 
keyConverter=None,
-                         valueConverter=None, conf=None):
+                         valueConverter=None, conf=None, batchSize=None):
         """
         Read a 'new API' Hadoop InputFormat with arbitrary key and value class 
from HDFS,
         a local file system (available on all nodes), or any Hadoop-supported 
file system URI.
@@ -425,14 +430,18 @@ class SparkContext(object):
         @param valueConverter: (None by default)
         @param conf: Hadoop configuration, passed in as a dict
                (None by default)
+        @param batchSize: The number of Python objects represented as a single
+               Java object. (default 
sc._default_batch_size_for_serialized_input)
         """
         jconf = self._dictToJavaMap(conf)
+        batchSize = max(1, batchSize or 
self._default_batch_size_for_serialized_input)
+        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else 
PickleSerializer()
         jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, 
inputFormatClass, keyClass,
-                                                    valueClass, keyConverter, 
valueConverter, jconf)
-        return RDD(jrdd, self, PickleSerializer())
+                    valueClass, keyConverter, valueConverter, jconf, batchSize)
+        return RDD(jrdd, self, ser)
 
     def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, 
keyConverter=None,
-                        valueConverter=None, conf=None):
+                        valueConverter=None, conf=None, batchSize=None):
         """
         Read a 'new API' Hadoop InputFormat with arbitrary key and value 
class, from an arbitrary
         Hadoop configuration, which is passed in as a Python dict.
@@ -449,14 +458,18 @@ class SparkContext(object):
         @param valueConverter: (None by default)
         @param conf: Hadoop configuration, passed in as a dict
                (None by default)
+        @param batchSize: The number of Python objects represented as a single
+               Java object. (default 
sc._default_batch_size_for_serialized_input)
         """
         jconf = self._dictToJavaMap(conf)
+        batchSize = max(1, batchSize or 
self._default_batch_size_for_serialized_input)
+        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else 
PickleSerializer()
         jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, 
inputFormatClass, keyClass,
-                                                   valueClass, keyConverter, 
valueConverter, jconf)
-        return RDD(jrdd, self, PickleSerializer())
+                    valueClass, keyConverter, valueConverter, jconf, batchSize)
+        return RDD(jrdd, self, ser)
 
     def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, 
keyConverter=None,
-                   valueConverter=None, conf=None):
+                   valueConverter=None, conf=None, batchSize=None):
         """
         Read an 'old' Hadoop InputFormat with arbitrary key and value class 
from HDFS,
         a local file system (available on all nodes), or any Hadoop-supported 
file system URI.
@@ -476,14 +489,18 @@ class SparkContext(object):
         @param valueConverter: (None by default)
         @param conf: Hadoop configuration, passed in as a dict
                (None by default)
+        @param batchSize: The number of Python objects represented as a single
+               Java object. (default 
sc._default_batch_size_for_serialized_input)
         """
         jconf = self._dictToJavaMap(conf)
+        batchSize = max(1, batchSize or 
self._default_batch_size_for_serialized_input)
+        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else 
PickleSerializer()
         jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, 
inputFormatClass, keyClass,
-                                              valueClass, keyConverter, 
valueConverter, jconf)
-        return RDD(jrdd, self, PickleSerializer())
+                    valueClass, keyConverter, valueConverter, jconf, batchSize)
+        return RDD(jrdd, self, ser)
 
     def hadoopRDD(self, inputFormatClass, keyClass, valueClass, 
keyConverter=None,
-                  valueConverter=None, conf=None):
+                  valueConverter=None, conf=None, batchSize=None):
         """
         Read an 'old' Hadoop InputFormat with arbitrary key and value class, 
from an arbitrary
         Hadoop configuration, which is passed in as a Python dict.
@@ -500,11 +517,15 @@ class SparkContext(object):
         @param valueConverter: (None by default)
         @param conf: Hadoop configuration, passed in as a dict
                (None by default)
+        @param batchSize: The number of Python objects represented as a single
+               Java object. (default 
sc._default_batch_size_for_serialized_input)
         """
         jconf = self._dictToJavaMap(conf)
+        batchSize = max(1, batchSize or 
self._default_batch_size_for_serialized_input)
+        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else 
PickleSerializer()
         jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, 
keyClass, valueClass,
-                                             keyConverter, valueConverter, 
jconf)
-        return RDD(jrdd, self, PickleSerializer())
+                    keyConverter, valueConverter, jconf, batchSize)
+        return RDD(jrdd, self, ser)
 
     def _checkpointFile(self, name, input_deserializer):
         jrdd = self._jsc.checkpointFile(name)

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index b84d976..e8fcc90 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -231,6 +231,13 @@ class RDD(object):
         self._jrdd_deserializer = jrdd_deserializer
         self._id = jrdd.id()
 
+    def _toPickleSerialization(self):
+        if (self._jrdd_deserializer == PickleSerializer() or
+            self._jrdd_deserializer == BatchedSerializer(PickleSerializer())):
+            return self
+        else:
+            return self._reserialize(BatchedSerializer(PickleSerializer(), 10))
+
     def id(self):
         """
         A unique ID for this RDD (within its SparkContext).
@@ -1030,6 +1037,113 @@ class RDD(object):
         """
         return self.take(1)[0]
 
+    def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, 
valueConverter=None):
+        """
+        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any 
Hadoop file
+        system, using the new Hadoop OutputFormat API (mapreduce package). 
Keys/values are
+        converted for output using either user specified converters or, by 
default,
+        L{org.apache.spark.api.python.JavaToWritableConverter}.
+
+        @param conf: Hadoop job configuration, passed in as a dict
+        @param keyConverter: (None by default)
+        @param valueConverter: (None by default)
+        """
+        jconf = self.ctx._dictToJavaMap(conf)
+        pickledRDD = self._toPickleSerialization()
+        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+        self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, 
jconf,
+                                                    keyConverter, 
valueConverter, True)
+
+    def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, 
valueClass=None,
+                               keyConverter=None, valueConverter=None, 
conf=None):
+        """
+        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any 
Hadoop file
+        system, using the new Hadoop OutputFormat API (mapreduce package). Key 
and value types
+        will be inferred if not specified. Keys and values are converted for 
output using either
+        user specified converters or 
L{org.apache.spark.api.python.JavaToWritableConverter}. The
+        C{conf} is applied on top of the base Hadoop conf associated with the 
SparkContext
+        of this RDD to create a merged Hadoop MapReduce job configuration for 
saving the data.
+
+        @param path: path to Hadoop file
+        @param outputFormatClass: fully qualified classname of Hadoop 
OutputFormat
+               (e.g. 
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
+        @param keyClass: fully qualified classname of key Writable class
+               (e.g. "org.apache.hadoop.io.IntWritable", None by default)
+        @param valueClass: fully qualified classname of value Writable class
+               (e.g. "org.apache.hadoop.io.Text", None by default)
+        @param keyConverter: (None by default)
+        @param valueConverter: (None by default)
+        @param conf: Hadoop job configuration, passed in as a dict (None by 
default)
+        """
+        jconf = self.ctx._dictToJavaMap(conf)
+        pickledRDD = self._toPickleSerialization()
+        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+        self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, 
batched, path,
+            outputFormatClass, keyClass, valueClass, keyConverter, 
valueConverter, jconf)
+
+    def saveAsHadoopDataset(self, conf, keyConverter=None, 
valueConverter=None):
+        """
+        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any 
Hadoop file
+        system, using the old Hadoop OutputFormat API (mapred package). 
Keys/values are
+        converted for output using either user specified converters or, by 
default,
+        L{org.apache.spark.api.python.JavaToWritableConverter}.
+
+        @param conf: Hadoop job configuration, passed in as a dict
+        @param keyConverter: (None by default)
+        @param valueConverter: (None by default)
+        """
+        jconf = self.ctx._dictToJavaMap(conf)
+        pickledRDD = self._toPickleSerialization()
+        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+        self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, 
jconf,
+                                                    keyConverter, 
valueConverter, False)
+
+    def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, 
valueClass=None,
+                         keyConverter=None, valueConverter=None, conf=None,
+                         compressionCodecClass=None):
+        """
+        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any 
Hadoop file
+        system, using the old Hadoop OutputFormat API (mapred package). Key 
and value types
+        will be inferred if not specified. Keys and values are converted for 
output using either
+        user specified converters or 
L{org.apache.spark.api.python.JavaToWritableConverter}. The
+        C{conf} is applied on top of the base Hadoop conf associated with the 
SparkContext
+        of this RDD to create a merged Hadoop MapReduce job configuration for 
saving the data.
+
+        @param path: path to Hadoop file
+        @param outputFormatClass: fully qualified classname of Hadoop 
OutputFormat
+               (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat")
+        @param keyClass: fully qualified classname of key Writable class
+               (e.g. "org.apache.hadoop.io.IntWritable", None by default)
+        @param valueClass: fully qualified classname of value Writable class
+               (e.g. "org.apache.hadoop.io.Text", None by default)
+        @param keyConverter: (None by default)
+        @param valueConverter: (None by default)
+        @param conf: (None by default)
+        @param compressionCodecClass: (None by default)
+        """
+        jconf = self.ctx._dictToJavaMap(conf)
+        pickledRDD = self._toPickleSerialization()
+        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+        self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, 
path,
+            outputFormatClass, keyClass, valueClass, keyConverter, 
valueConverter,
+            jconf, compressionCodecClass)
+
+    def saveAsSequenceFile(self, path, compressionCodecClass=None):
+        """
+        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any 
Hadoop file
+        system, using the L{org.apache.hadoop.io.Writable} types that we 
convert from the
+        RDD's key and value types. The mechanism is as follows:
+            1. Pyrolite is used to convert pickled Python RDD into RDD of Java 
objects.
+            2. Keys and values of this Java RDD are converted to Writables and 
written out.
+
+        @param path: path to sequence file
+        @param compressionCodecClass: (None by default)
+        """
+        pickledRDD = self._toPickleSerialization()
+        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+        self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, batched,
+                                                   path, compressionCodecClass)
+
     def saveAsPickleFile(self, path, batchSize=10):
         """
         Save this RDD as a SequenceFile of serialized objects. The serializer

Reply via email to