Re: Using sparkSQL to convert a collection of python dictionary of dictionaries to schma RDD
I worked man.. Thanks alot :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkSQL-to-convert-a-collection-of-python-dictionary-of-dictionaries-to-schma-RDD-tp20228p20461.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using sparkSQL to convert a collection of python dictionary of dictionaries to schma RDD
Hi Davies, Thanks for the reply The problem is I have empty dictionaries in my field3 as well. It gives me an error : Traceback (most recent call last): File stdin, line 1, in module File /root/spark/python/pyspark/sql.py, line 1042, in inferSchema schema = _infer_schema(first) File /root/spark/python/pyspark/sql.py, line 495, in _infer_schema fields = [StructField(k, _infer_type(v), True) for k, v in items] File /root/spark/python/pyspark/sql.py, line 460, in _infer_type raise ValueError(Can not infer type for empty dict) ValueError: Can not infer type for empty dict When I remove the empty dictionary items from each record. That is, when mapping to the main dictionary, if field3 is an empty ditc, i do not include that hence the record converts from { field1:5, field2: 'string', field3: {} } to { field1:5, field2: 'string', } At this point, I get : ERROR TaskSetManager: Task 0 in stage 14.0 failed 4 times; aborting job Traceback (most recent call last): File stdin, line 1, in module File /root/spark/python/pyspark/sql.py, line 1044, in inferSchema return self.applySchema(rdd, schema) File /root/spark/python/pyspark/sql.py, line 1117, in applySchema rows = rdd.take(10) File /root/spark/python/pyspark/rdd.py, line 1153, in take res = self.context.runJob(self, takeUpToNumLeft, p, True) File /root/spark/python/pyspark/context.py, line 770, in runJob it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 4 times, most recent failure: Lost task 0.3 in stage 14.0 (TID 22628, ip-172-31-30-89.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /root/spark/python/pyspark/worker.py, line 79, in main serializer.dump_stream(func(split_index, iterator), outfile) File /root/spark/python/pyspark/serializers.py, line 196, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File /root/spark/python/pyspark/serializers.py, line 127, in dump_stream for obj in iterator: File /root/spark/python/pyspark/serializers.py, line 185, in _batched for item in iterator: File /root/spark/python/pyspark/rdd.py, line 1148, in takeUpToNumLeft yield next(iterator) File /root/spark/python/pyspark/sql.py, line 552, in _drop_schema yield converter(i) File /root/spark/python/pyspark/sql.py, line 540, in nested_conv return tuple(f(v) for f, v in zip(convs, conv(row))) File /root/spark/python/pyspark/sql.py, line 540, in genexpr return tuple(f(v) for f, v in zip(convs, conv(row))) File /root/spark/python/pyspark/sql.py, line 508, in lambda return lambda row: dict((k, conv(v)) for k, v in row.iteritems()) AttributeError: 'int' object has no attribute 'iteritems' I am clueless what to do about this. Hope you can help :) Many thanks SahanB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkSQL-to-convert-a-collection-of-python-dictionary-of-dictionaries-to-schma-RDD-tp20228p20364.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using sparkSQL to convert a collection of python dictionary of dictionaries to schma RDD
Which version of Spark are you using? inferSchema() is improved to support empty dict in 1.2+, could you try the 1.2-RC1? Also, you can use applySchema(): from pyspark.sql import * fields = [StructField('field1', IntegerType(), True), StructField('field2', StringType(), True), StructField('field3', MapType(StringType(), IntegerType(), True))] schema = StructType(fields) rdd2 = rdd.map(lambda x: (x['field1'], x['field2'], x['field3'])) sqlContext.applySchema(rdd2, schema) PS: the above code is not testsed. Davies On Thu, Dec 4, 2014 at 4:22 AM, sahanbull sa...@skimlinks.com wrote: Hi Davies, Thanks for the reply The problem is I have empty dictionaries in my field3 as well. It gives me an error : Traceback (most recent call last): File stdin, line 1, in module File /root/spark/python/pyspark/sql.py, line 1042, in inferSchema schema = _infer_schema(first) File /root/spark/python/pyspark/sql.py, line 495, in _infer_schema fields = [StructField(k, _infer_type(v), True) for k, v in items] File /root/spark/python/pyspark/sql.py, line 460, in _infer_type raise ValueError(Can not infer type for empty dict) ValueError: Can not infer type for empty dict When I remove the empty dictionary items from each record. That is, when mapping to the main dictionary, if field3 is an empty ditc, i do not include that hence the record converts from { field1:5, field2: 'string', field3: {} } to { field1:5, field2: 'string', } At this point, I get : ERROR TaskSetManager: Task 0 in stage 14.0 failed 4 times; aborting job Traceback (most recent call last): File stdin, line 1, in module File /root/spark/python/pyspark/sql.py, line 1044, in inferSchema return self.applySchema(rdd, schema) File /root/spark/python/pyspark/sql.py, line 1117, in applySchema rows = rdd.take(10) File /root/spark/python/pyspark/rdd.py, line 1153, in take res = self.context.runJob(self, takeUpToNumLeft, p, True) File /root/spark/python/pyspark/context.py, line 770, in runJob it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 4 times, most recent failure: Lost task 0.3 in stage 14.0 (TID 22628, ip-172-31-30-89.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /root/spark/python/pyspark/worker.py, line 79, in main serializer.dump_stream(func(split_index, iterator), outfile) File /root/spark/python/pyspark/serializers.py, line 196, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File /root/spark/python/pyspark/serializers.py, line 127, in dump_stream for obj in iterator: File /root/spark/python/pyspark/serializers.py, line 185, in _batched for item in iterator: File /root/spark/python/pyspark/rdd.py, line 1148, in takeUpToNumLeft yield next(iterator) File /root/spark/python/pyspark/sql.py, line 552, in _drop_schema yield converter(i) File /root/spark/python/pyspark/sql.py, line 540, in nested_conv return tuple(f(v) for f, v in zip(convs, conv(row))) File /root/spark/python/pyspark/sql.py, line 540, in genexpr return tuple(f(v) for f, v in zip(convs, conv(row))) File /root/spark/python/pyspark/sql.py, line 508, in lambda return lambda row: dict((k, conv(v)) for k, v in row.iteritems()) AttributeError: 'int' object has no attribute 'iteritems' I am clueless what to do about this. Hope you can help :) Many thanks SahanB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkSQL-to-convert-a-collection-of-python-dictionary-of-dictionaries-to-schma-RDD-tp20228p20364.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using sparkSQL to convert a collection of python dictionary of dictionaries to schma RDD
inferSchema() will work better than jsonRDD() in your case, from pyspark.sql import Row srdd = sqlContext.inferSchema(rdd.map(lambda x: Row(**x))) srdd.first() Row( field1=5, field2='string', field3={'a'=1, 'c'=2}) On Wed, Dec 3, 2014 at 12:11 AM, sahanbull sa...@skimlinks.com wrote: Hi Guys, I am trying to use SparkSQL to convert an RDD to SchemaRDD so that I can save it in parquet format. A record in my RDD has the following format: RDD1 { field1:5, field2: 'string', field3: {'a':1, 'c':2} } I am using field3 to represent a sparse vector and it can have keys: 'a','b' or 'c' and values any int value The current approach I am using is : schemaRDD1 = sqc.jsonRDD(RDD1.map(lambda x: simplejson.dumps(x))) But when I do this, the dictionary in field 3 also gets converted to a SparkSQL Row. This converts field3 to be a dense data structure where it holds value None for every key that is not present in the field 3 for each record. When I do test = RDD1.map(lambda x: simplejson.dumps(x)) test.first() my output is: {field1: 5, field2:string, field3 :{a:1,c:2}} But then when I do schemaRDD = sqc.jsonRDD(test) schemaRDD.first() my output is : Row( field1=5, field2='string', field3 = Row(a=1,b=None,c=2)) in realty, I have 1000s of probable keys in field 3 and only 2 to 3 of them occur per record. So When tic converts to a Row, it generates thousands of None fields per record. Is there anyways for me to store field3 as a dictionary instead of converting it into a Row in the schemaRDD?? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkSQL-to-convert-a-collection-of-python-dictionary-of-dictionaries-to-schma-RDD-tp20228.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org