Which version of Spark are you using? inferSchema() is improved to support empty dict in 1.2+, could you try the 1.2-RC1?
Also, you can use applySchema(): from pyspark.sql import * fields = [StructField('field1', IntegerType(), True), StructField('field2', StringType(), True), StructField('field3', MapType(StringType(), IntegerType(), True))] schema = StructType(fields) rdd2 = rdd.map(lambda x: (x['field1'], x['field2'], x['field3'])) sqlContext.applySchema(rdd2, schema) PS: the above code is not testsed. Davies On Thu, Dec 4, 2014 at 4:22 AM, sahanbull <sa...@skimlinks.com> wrote: > Hi Davies, > > Thanks for the reply > > The problem is I have empty dictionaries in my field3 as well. It gives me > an error : > > Traceback (most recent call last): > File "<stdin>", line 1, in <module> > File "/root/spark/python/pyspark/sql.py", line 1042, in inferSchema > schema = _infer_schema(first) > File "/root/spark/python/pyspark/sql.py", line 495, in _infer_schema > fields = [StructField(k, _infer_type(v), True) for k, v in items] > File "/root/spark/python/pyspark/sql.py", line 460, in _infer_type > raise ValueError("Can not infer type for empty dict") > ValueError: Can not infer type for empty dict > > When I remove the empty dictionary items from each record. That is, when > mapping to the main dictionary, if field3 is an empty ditc, i do not include > that hence the record converts from > > { > field1:5, > field2: 'string', > field3: {} > } > > to > > > { > field1:5, > field2: 'string', > } > > At this point, I get : > > ERROR TaskSetManager: Task 0 in stage 14.0 failed 4 times; aborting job > Traceback (most recent call last): > File "<stdin>", line 1, in <module> > File "/root/spark/python/pyspark/sql.py", line 1044, in inferSchema > return self.applySchema(rdd, schema) > File "/root/spark/python/pyspark/sql.py", line 1117, in applySchema > rows = rdd.take(10) > File "/root/spark/python/pyspark/rdd.py", line 1153, in take > res = self.context.runJob(self, takeUpToNumLeft, p, True) > File "/root/spark/python/pyspark/context.py", line 770, in runJob > it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, > javaPartitions, allowLocal) > File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", > line 538, in __call__ > File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line > 300, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.runJob. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 14.0 failed 4 times, most recent failure: Lost task 0.3 in stage > 14.0 (TID 22628, ip-172-31-30-89.ec2.internal): > org.apache.spark.api.python.PythonException: Traceback (most recent call > last): > File "/root/spark/python/pyspark/worker.py", line 79, in main > serializer.dump_stream(func(split_index, iterator), outfile) > File "/root/spark/python/pyspark/serializers.py", line 196, in dump_stream > self.serializer.dump_stream(self._batched(iterator), stream) > File "/root/spark/python/pyspark/serializers.py", line 127, in dump_stream > for obj in iterator: > File "/root/spark/python/pyspark/serializers.py", line 185, in _batched > for item in iterator: > File "/root/spark/python/pyspark/rdd.py", line 1148, in takeUpToNumLeft > yield next(iterator) > File "/root/spark/python/pyspark/sql.py", line 552, in _drop_schema > yield converter(i) > File "/root/spark/python/pyspark/sql.py", line 540, in nested_conv > return tuple(f(v) for f, v in zip(convs, conv(row))) > File "/root/spark/python/pyspark/sql.py", line 540, in <genexpr> > return tuple(f(v) for f, v in zip(convs, conv(row))) > File "/root/spark/python/pyspark/sql.py", line 508, in <lambda> > return lambda row: dict((k, conv(v)) for k, v in row.iteritems()) > AttributeError: 'int' object has no attribute 'iteritems' > > I am clueless what to do about this. Hope you can help :) > > Many thanks > SahanB > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkSQL-to-convert-a-collection-of-python-dictionary-of-dictionaries-to-schma-RDD-tp20228p20364.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org