Repository: spark
Updated Branches:
  refs/heads/master 437dc8c5b -> 94d1f46fc


http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 8486c85..c29deb9 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -19,6 +19,7 @@
 Unit tests for PySpark; additional tests are implemented as doctests in
 individual modules.
 """
+from array import array
 from fileinput import input
 from glob import glob
 import os
@@ -327,6 +328,17 @@ class TestInputFormat(PySparkTestCase):
         ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, 
u'bb'), (3.0, u'cc')]
         self.assertEqual(doubles, ed)
 
+        bytes = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbytes/",
+                                              
"org.apache.hadoop.io.IntWritable",
+                                              
"org.apache.hadoop.io.BytesWritable").collect())
+        ebs = [(1, bytearray('aa', 'utf-8')),
+               (1, bytearray('aa', 'utf-8')),
+               (2, bytearray('aa', 'utf-8')),
+               (2, bytearray('bb', 'utf-8')),
+               (2, bytearray('bb', 'utf-8')),
+               (3, bytearray('cc', 'utf-8'))]
+        self.assertEqual(bytes, ebs)
+
         text = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sftext/",
                                            "org.apache.hadoop.io.Text",
                                            
"org.apache.hadoop.io.Text").collect())
@@ -353,14 +365,34 @@ class TestInputFormat(PySparkTestCase):
         maps = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfmap/",
                                            "org.apache.hadoop.io.IntWritable",
                                            
"org.apache.hadoop.io.MapWritable").collect())
-        em = [(1, {2.0: u'aa'}),
+        em = [(1, {}),
               (1, {3.0: u'bb'}),
               (2, {1.0: u'aa'}),
               (2, {1.0: u'cc'}),
-              (2, {3.0: u'bb'}),
               (3, {2.0: u'dd'})]
         self.assertEqual(maps, em)
 
+        # arrays get pickled to tuples by default
+        tuples = sorted(self.sc.sequenceFile(
+            basepath + "/sftestdata/sfarray/",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.spark.api.python.DoubleArrayWritable").collect())
+        et = [(1, ()),
+              (2, (3.0, 4.0, 5.0)),
+              (3, (4.0, 5.0, 6.0))]
+        self.assertEqual(tuples, et)
+
+        # with custom converters, primitive arrays can stay as arrays
+        arrays = sorted(self.sc.sequenceFile(
+            basepath + "/sftestdata/sfarray/",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.spark.api.python.DoubleArrayWritable",
+            
valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
+        ea = [(1, array('d')),
+              (2, array('d', [3.0, 4.0, 5.0])),
+              (3, array('d', [4.0, 5.0, 6.0]))]
+        self.assertEqual(arrays, ea)
+
         clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
                                             "org.apache.hadoop.io.Text",
                                             
"org.apache.spark.api.python.TestWritable").collect())
@@ -369,6 +401,12 @@ class TestInputFormat(PySparkTestCase):
                u'double': 54.0, u'int': 123, u'str': u'test1'})
         self.assertEqual(clazz[0], ec)
 
+        unbatched_clazz = sorted(self.sc.sequenceFile(basepath + 
"/sftestdata/sfclass/",
+                                            "org.apache.hadoop.io.Text",
+                                            
"org.apache.spark.api.python.TestWritable",
+                                            batchSize=1).collect())
+        self.assertEqual(unbatched_clazz[0], ec)
+
     def test_oldhadoop(self):
         basepath = self.tempdir.name
         ints = sorted(self.sc.hadoopFile(basepath + "/sftestdata/sfint/",
@@ -379,10 +417,11 @@ class TestInputFormat(PySparkTestCase):
         self.assertEqual(ints, ei)
 
         hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
-        hello = self.sc.hadoopFile(hellopath,
-                                   "org.apache.hadoop.mapred.TextInputFormat",
-                                   "org.apache.hadoop.io.LongWritable",
-                                   "org.apache.hadoop.io.Text").collect()
+        oldconf = {"mapred.input.dir" : hellopath}
+        hello = self.sc.hadoopRDD("org.apache.hadoop.mapred.TextInputFormat",
+                                  "org.apache.hadoop.io.LongWritable",
+                                  "org.apache.hadoop.io.Text",
+                                  conf=oldconf).collect()
         result = [(0, u'Hello World!')]
         self.assertEqual(hello, result)
 
@@ -397,10 +436,11 @@ class TestInputFormat(PySparkTestCase):
         self.assertEqual(ints, ei)
 
         hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
-        hello = self.sc.newAPIHadoopFile(hellopath,
-                                         
"org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
-                                         "org.apache.hadoop.io.LongWritable",
-                                         "org.apache.hadoop.io.Text").collect()
+        newconf = {"mapred.input.dir" : hellopath}
+        hello = 
self.sc.newAPIHadoopRDD("org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
+                                        "org.apache.hadoop.io.LongWritable",
+                                        "org.apache.hadoop.io.Text",
+                                        conf=newconf).collect()
         result = [(0, u'Hello World!')]
         self.assertEqual(hello, result)
 
@@ -435,16 +475,267 @@ class TestInputFormat(PySparkTestCase):
             "org.apache.hadoop.io.IntWritable",
             "org.apache.hadoop.io.Text"))
 
-    def test_converter(self):
+    def test_converters(self):
+        # use of custom converters
         basepath = self.tempdir.name
         maps = sorted(self.sc.sequenceFile(
             basepath + "/sftestdata/sfmap/",
             "org.apache.hadoop.io.IntWritable",
             "org.apache.hadoop.io.MapWritable",
-            
valueConverter="org.apache.spark.api.python.TestConverter").collect())
-        em = [(1, [2.0]), (1, [3.0]), (2, [1.0]), (2, [1.0]), (2, [3.0]), (3, 
[2.0])]
+            keyConverter="org.apache.spark.api.python.TestInputKeyConverter",
+            
valueConverter="org.apache.spark.api.python.TestInputValueConverter").collect())
+        em = [(u'\x01', []),
+              (u'\x01', [3.0]),
+              (u'\x02', [1.0]),
+              (u'\x02', [1.0]),
+              (u'\x03', [2.0])]
+        self.assertEqual(maps, em)
+
+class TestOutputFormat(PySparkTestCase):
+
+    def setUp(self):
+        PySparkTestCase.setUp(self)
+        self.tempdir = tempfile.NamedTemporaryFile(delete=False)
+        os.unlink(self.tempdir.name)
+
+    def tearDown(self):
+        PySparkTestCase.tearDown(self)
+        shutil.rmtree(self.tempdir.name, ignore_errors=True)
+
+    def test_sequencefiles(self):
+        basepath = self.tempdir.name
+        ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, 
u'cc')]
+        self.sc.parallelize(ei).saveAsSequenceFile(basepath + "/sfint/")
+        ints = sorted(self.sc.sequenceFile(basepath + "/sfint/").collect())
+        self.assertEqual(ints, ei)
+
+        ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, 
u'bb'), (3.0, u'cc')]
+        self.sc.parallelize(ed).saveAsSequenceFile(basepath + "/sfdouble/")
+        doubles = sorted(self.sc.sequenceFile(basepath + 
"/sfdouble/").collect())
+        self.assertEqual(doubles, ed)
+
+        ebs = [(1, bytearray(b'\x00\x07spam\x08')), (2, 
bytearray(b'\x00\x07spam\x08'))]
+        self.sc.parallelize(ebs).saveAsSequenceFile(basepath + "/sfbytes/")
+        bytes = sorted(self.sc.sequenceFile(basepath + "/sfbytes/").collect())
+        self.assertEqual(bytes, ebs)
+
+        et = [(u'1', u'aa'),
+              (u'2', u'bb'),
+              (u'3', u'cc')]
+        self.sc.parallelize(et).saveAsSequenceFile(basepath + "/sftext/")
+        text = sorted(self.sc.sequenceFile(basepath + "/sftext/").collect())
+        self.assertEqual(text, et)
+
+        eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, 
True)]
+        self.sc.parallelize(eb).saveAsSequenceFile(basepath + "/sfbool/")
+        bools = sorted(self.sc.sequenceFile(basepath + "/sfbool/").collect())
+        self.assertEqual(bools, eb)
+
+        en = [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)]
+        self.sc.parallelize(en).saveAsSequenceFile(basepath + "/sfnull/")
+        nulls = sorted(self.sc.sequenceFile(basepath + "/sfnull/").collect())
+        self.assertEqual(nulls, en)
+
+        em = [(1, {}),
+              (1, {3.0: u'bb'}),
+              (2, {1.0: u'aa'}),
+              (2, {1.0: u'cc'}),
+              (3, {2.0: u'dd'})]
+        self.sc.parallelize(em).saveAsSequenceFile(basepath + "/sfmap/")
+        maps = sorted(self.sc.sequenceFile(basepath + "/sfmap/").collect())
         self.assertEqual(maps, em)
 
+    def test_oldhadoop(self):
+        basepath = self.tempdir.name
+        dict_data = [(1, {}),
+                     (1, {"row1" : 1.0}),
+                     (2, {"row2" : 2.0})]
+        self.sc.parallelize(dict_data).saveAsHadoopFile(
+            basepath + "/oldhadoop/",
+            "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.hadoop.io.MapWritable")
+        result = sorted(self.sc.hadoopFile(
+            basepath + "/oldhadoop/",
+            "org.apache.hadoop.mapred.SequenceFileInputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.hadoop.io.MapWritable").collect())
+        self.assertEqual(result, dict_data)
+
+        conf = {
+            "mapred.output.format.class" : 
"org.apache.hadoop.mapred.SequenceFileOutputFormat",
+            "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+            "mapred.output.value.class" : "org.apache.hadoop.io.MapWritable",
+            "mapred.output.dir" : basepath + "/olddataset/"}
+        self.sc.parallelize(dict_data).saveAsHadoopDataset(conf)
+        input_conf = {"mapred.input.dir" : basepath + "/olddataset/"}
+        old_dataset = sorted(self.sc.hadoopRDD(
+            "org.apache.hadoop.mapred.SequenceFileInputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.hadoop.io.MapWritable",
+            conf=input_conf).collect())
+        self.assertEqual(old_dataset, dict_data)
+
+    def test_newhadoop(self):
+        basepath = self.tempdir.name
+        # use custom ArrayWritable types and converters to handle arrays
+        array_data = [(1, array('d')),
+                      (1, array('d', [1.0, 2.0, 3.0])),
+                      (2, array('d', [3.0, 4.0, 5.0]))]
+        self.sc.parallelize(array_data).saveAsNewAPIHadoopFile(
+            basepath + "/newhadoop/",
+            "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.spark.api.python.DoubleArrayWritable",
+            
valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
+        result = sorted(self.sc.newAPIHadoopFile(
+            basepath + "/newhadoop/",
+            "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.spark.api.python.DoubleArrayWritable",
+            
valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
+        self.assertEqual(result, array_data)
+
+        conf = {"mapreduce.outputformat.class" :
+                     
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+                 "mapred.output.key.class" : 
"org.apache.hadoop.io.IntWritable",
+                 "mapred.output.value.class" : 
"org.apache.spark.api.python.DoubleArrayWritable",
+                 "mapred.output.dir" : basepath + "/newdataset/"}
+        self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(conf,
+            
valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
+        input_conf = {"mapred.input.dir" : basepath + "/newdataset/"}
+        new_dataset = sorted(self.sc.newAPIHadoopRDD(
+            "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.spark.api.python.DoubleArrayWritable",
+            
valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter",
+            conf=input_conf).collect())
+        self.assertEqual(new_dataset, array_data)
+
+    def test_newolderror(self):
+        basepath = self.tempdir.name
+        rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+        self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
+            basepath + "/newolderror/saveAsHadoopFile/",
+            "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"))
+        self.assertRaises(Exception, lambda: rdd.saveAsNewAPIHadoopFile(
+            basepath + "/newolderror/saveAsNewAPIHadoopFile/",
+            "org.apache.hadoop.mapred.SequenceFileOutputFormat"))
+
+    def test_bad_inputs(self):
+        basepath = self.tempdir.name
+        rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+        self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
+            basepath + "/badinputs/saveAsHadoopFile/",
+            "org.apache.hadoop.mapred.NotValidOutputFormat"))
+        self.assertRaises(Exception, lambda: rdd.saveAsNewAPIHadoopFile(
+            basepath + "/badinputs/saveAsNewAPIHadoopFile/",
+            "org.apache.hadoop.mapreduce.lib.output.NotValidOutputFormat"))
+
+    def test_converters(self):
+        # use of custom converters
+        basepath = self.tempdir.name
+        data = [(1, {3.0: u'bb'}),
+                (2, {1.0: u'aa'}),
+                (3, {2.0: u'dd'})]
+        self.sc.parallelize(data).saveAsNewAPIHadoopFile(
+            basepath + "/converters/",
+            "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+            keyConverter="org.apache.spark.api.python.TestOutputKeyConverter",
+            
valueConverter="org.apache.spark.api.python.TestOutputValueConverter")
+        converted = sorted(self.sc.sequenceFile(basepath + 
"/converters/").collect())
+        expected = [(u'1', 3.0),
+                    (u'2', 1.0),
+                    (u'3', 2.0)]
+        self.assertEqual(converted, expected)
+
+    def test_reserialization(self):
+        basepath = self.tempdir.name
+        x = range(1, 5)
+        y = range(1001, 1005)
+        data = zip(x, y)
+        rdd = self.sc.parallelize(x).zip(self.sc.parallelize(y))
+        rdd.saveAsSequenceFile(basepath + "/reserialize/sequence")
+        result1 = sorted(self.sc.sequenceFile(basepath + 
"/reserialize/sequence").collect())
+        self.assertEqual(result1, data)
+
+        rdd.saveAsHadoopFile(basepath + "/reserialize/hadoop",
+                             
"org.apache.hadoop.mapred.SequenceFileOutputFormat")
+        result2 = sorted(self.sc.sequenceFile(basepath + 
"/reserialize/hadoop").collect())
+        self.assertEqual(result2, data)
+
+        rdd.saveAsNewAPIHadoopFile(basepath + "/reserialize/newhadoop",
+                             
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
+        result3 = sorted(self.sc.sequenceFile(basepath + 
"/reserialize/newhadoop").collect())
+        self.assertEqual(result3, data)
+
+        conf4 = {
+            "mapred.output.format.class" : 
"org.apache.hadoop.mapred.SequenceFileOutputFormat",
+            "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+            "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable",
+            "mapred.output.dir" : basepath + "/reserialize/dataset"}
+        rdd.saveAsHadoopDataset(conf4)
+        result4 = sorted(self.sc.sequenceFile(basepath + 
"/reserialize/dataset").collect())
+        self.assertEqual(result4, data)
+
+        conf5 = {"mapreduce.outputformat.class" :
+                     
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+            "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+            "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable",
+            "mapred.output.dir" : basepath + "/reserialize/newdataset"}
+        rdd.saveAsNewAPIHadoopDataset(conf5)
+        result5 = sorted(self.sc.sequenceFile(basepath + 
"/reserialize/newdataset").collect())
+        self.assertEqual(result5, data)
+
+    def test_unbatched_save_and_read(self):
+        basepath = self.tempdir.name
+        ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, 
u'cc')]
+        self.sc.parallelize(ei, numSlices=len(ei)).saveAsSequenceFile(
+            basepath + "/unbatched/")
+
+        unbatched_sequence = sorted(self.sc.sequenceFile(basepath + 
"/unbatched/",
+            batchSize=1).collect())
+        self.assertEqual(unbatched_sequence, ei)
+
+        unbatched_hadoopFile = sorted(self.sc.hadoopFile(basepath + 
"/unbatched/",
+            "org.apache.hadoop.mapred.SequenceFileInputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.hadoop.io.Text",
+            batchSize=1).collect())
+        self.assertEqual(unbatched_hadoopFile, ei)
+
+        unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(basepath 
+ "/unbatched/",
+            "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.hadoop.io.Text",
+            batchSize=1).collect())
+        self.assertEqual(unbatched_newAPIHadoopFile, ei)
+
+        oldconf = {"mapred.input.dir" : basepath + "/unbatched/"}
+        unbatched_hadoopRDD = sorted(self.sc.hadoopRDD(
+            "org.apache.hadoop.mapred.SequenceFileInputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.hadoop.io.Text",
+            conf=oldconf,
+            batchSize=1).collect())
+        self.assertEqual(unbatched_hadoopRDD, ei)
+
+        newconf = {"mapred.input.dir" : basepath + "/unbatched/"}
+        unbatched_newAPIHadoopRDD = sorted(self.sc.newAPIHadoopRDD(
+            "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.hadoop.io.Text",
+            conf=newconf,
+            batchSize=1).collect())
+        self.assertEqual(unbatched_newAPIHadoopRDD, ei)
+
+    def test_malformed_RDD(self):
+        basepath = self.tempdir.name
+        # non-batch-serialized RDD[[(K, V)]] should be rejected
+        data = [[(1, "a")], [(2, "aa")], [(3, "aaa")]]
+        rdd = self.sc.parallelize(data, numSlices=len(data))
+        self.assertRaises(Exception, lambda: rdd.saveAsSequenceFile(
+            basepath + "/malformed/sequence"))
 
 class TestDaemon(unittest.TestCase):
     def connect(self, port):

Reply via email to