Re: Spark Python with SequenceFile containing numpy deserialized data in str form
Hi, I saw the posting about storing NumPy values in sequence files: http://mail-archives.us.apache.org/mod_mbox/spark-user/201506.mbox/%3cCAJQK-mg1PUCc_hkV=q3n-01ioq_pkwe1g-c39ximco3khqn...@mail.gmail.com%3e I’ve had a go at implementing this, and issued a PR request at https://github.com/apache/spark/pull/8384 For anyone interested if this functionality, please check it out, I’m keen to get feedback on it. Thanks Peter. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Python with SequenceFile containing numpy deserialized data in str form
Update: Using bytearray before storing to RDD is not a solution either. This happens when trying to read the RDD when the value was stored as python bytearray: Traceback (most recent call last): [0/9120] File /vagrant/python/kmeans.py, line 24, in module features = sc.sequenceFile(feature_sequencefile_path) File /usr/local/spark/python/pyspark/context.py, line 490, in sequenceFile keyConverter, valueConverter, minSplits, batchSize) File /usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.sequenceFile. : org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/tmp/feature-bytearray at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:45) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.RDD.take(RDD.scala:1156) at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:205) at org.apache.spark.api.python.PythonRDD$.sequenceFile(PythonRDD.scala:447) at org.apache.spark.api.python.PythonRDD.sequenceFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) On Tue, Jun 9, 2015 at 11:04 AM, Sam Stoelinga sammiest...@gmail.com wrote: Hi all, I'm storing an rdd as sequencefile with the following content: key=filename(string) value=python str from numpy.savez(not unicode) In order to make sure the whole numpy array get's stored I have to first serialize it with: def serialize_numpy_array(numpy_array): output = io.BytesIO() np.savez_compressed(output, x=numpy_array) return output.getvalue() type(output.getvalue()) str The deserialization returns a python str, *not unicode object*. After deserialization I call my_dersialized_numpy_rdd.saveAsSequenceFile(path) all works well and the RDD get stored successfully. Now the problem starts I want to read the sequencefile again: my_dersialized_numpy_rdd = sc.sequenceFile(path) first = my_dersialized_numpy_rdd.first() type(first[1]) unicode The previous str became a unicode object after we stored it to a sequencefile and read it again. Trying to convert it back with first[1].decode(ascii) fails with UnicodeEncodeError: 'ascii' codec can't encode characters in position 1-3: ordinal not in range(128) My expectation was that I would get the data back as how I stored it for example in str format and not in unicode format. Anybody suggestion how I can read back the original data. Will try converting the str to bytearray before storing it to a seqeencefile. Thanks, Sam Stoelinga
Re: Spark Python with SequenceFile containing numpy deserialized data in str form
Update: I've done a workaround to use saveAsPickleFile instead which handles everything correctly. It stays in byte format. Noticed python got messy with str and byte being the same in Python 2.7, wondering whether using Python 3 would have the same problem. I would still like to use a cross language usable SequenceFile instead of using Picklefile though, so if anybody has pointers would appreciate that :) On Tue, Jun 9, 2015 at 11:35 AM, Sam Stoelinga sammiest...@gmail.com wrote: Update: Using bytearray before storing to RDD is not a solution either. This happens when trying to read the RDD when the value was stored as python bytearray: Traceback (most recent call last): [0/9120] File /vagrant/python/kmeans.py, line 24, in module features = sc.sequenceFile(feature_sequencefile_path) File /usr/local/spark/python/pyspark/context.py, line 490, in sequenceFile keyConverter, valueConverter, minSplits, batchSize) File /usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.sequenceFile. : org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/tmp/feature-bytearray at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:45) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.RDD.take(RDD.scala:1156) at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:205) at org.apache.spark.api.python.PythonRDD$.sequenceFile(PythonRDD.scala:447) at org.apache.spark.api.python.PythonRDD.sequenceFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) On Tue, Jun 9, 2015 at 11:04 AM, Sam Stoelinga sammiest...@gmail.com wrote: Hi all, I'm storing an rdd as sequencefile with the following content: key=filename(string) value=python str from numpy.savez(not unicode) In order to make sure the whole numpy array get's stored I have to first serialize it with: def serialize_numpy_array(numpy_array): output = io.BytesIO() np.savez_compressed(output, x=numpy_array) return output.getvalue() type(output.getvalue()) str The deserialization returns a python str, *not unicode object*. After deserialization I call my_dersialized_numpy_rdd.saveAsSequenceFile(path) all works well and the RDD get stored successfully. Now the problem starts I want to read the sequencefile again: my_dersialized_numpy_rdd = sc.sequenceFile(path) first = my_dersialized_numpy_rdd.first() type(first[1]) unicode The previous str became a unicode object after we stored it to a sequencefile and read it again. Trying to convert it back with first[1].decode(ascii) fails with UnicodeEncodeError: 'ascii' codec can't encode characters in position 1-3: ordinal not in range(128) My expectation was that I would get the data back as how I stored it for example in str format and not in unicode format. Anybody suggestion how I can read back the original data. Will try converting the str to