PySpark Nested Json Parsing
Hi, I am new to Apache Spark. I am trying to parse nested json using pyspark. Here is the code by which I am trying to parse Json. I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2. lines = sc.textFile(inputFile) import json def func(x): json_str = json.loads(x) if json_str['label']: if json_str['label']['label2']: return (1,1) return (0,1) lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile) I am getting following error, ERROR [Executor task launch worker-13] executor.Executor (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 107, in main process() File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 247, in func return f(iterator) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 1561, in combineLocally merger.mergeValues(iterator) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py, line 252, in mergeValues for k, v in iterator: File stdin, line 2, in func File /usr/lib64/python2.6/json/__init__.py, line 307, in loads return _default_decoder.decode(s) File /usr/lib64/python2.6/json/decoder.py, line 319, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File /usr/lib64/python2.6/json/decoder.py, line 336, in raw_decode obj, end = self._scanner.iterscan(s, **kw).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject value, end = iterscan(s, idx=end, context=context).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject value, end = iterscan(s, idx=end, context=context).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 155, in JSONString return scanstring(match.string, match.end(), encoding, strict) ValueError: Invalid \escape: line 1 column 855 (char 855) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12] executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in stage 14.0 (TID 24) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 107, in main process() File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 247, in func return f(iterator) File
Re: PySpark Nested Json Parsing
Could you try SQLContext.read.json()? On Mon, Jul 20, 2015 at 9:06 AM, Davies Liu dav...@databricks.com wrote: Before using the json file as text file, can you make sure that each json string can fit in one line? Because textFile() will split the file by '\n' On Mon, Jul 20, 2015 at 3:26 AM, Ajay ajay0...@gmail.com wrote: Hi, I am new to Apache Spark. I am trying to parse nested json using pyspark. Here is the code by which I am trying to parse Json. I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2. lines = sc.textFile(inputFile) import json def func(x): json_str = json.loads(x) if json_str['label']: if json_str['label']['label2']: return (1,1) return (0,1) lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile) I am getting following error, ERROR [Executor task launch worker-13] executor.Executor (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 107, in main process() File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 247, in func return f(iterator) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 1561, in combineLocally merger.mergeValues(iterator) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py, line 252, in mergeValues for k, v in iterator: File stdin, line 2, in func File /usr/lib64/python2.6/json/__init__.py, line 307, in loads return _default_decoder.decode(s) File /usr/lib64/python2.6/json/decoder.py, line 319, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File /usr/lib64/python2.6/json/decoder.py, line 336, in raw_decode obj, end = self._scanner.iterscan(s, **kw).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject value, end = iterscan(s, idx=end, context=context).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject value, end = iterscan(s, idx=end, context=context).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 155, in JSONString return scanstring(match.string, match.end(), encoding, strict) ValueError: Invalid \escape: line 1 column 855 (char 855) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12] executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in stage 14.0 (TID 24) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 107, in main process() File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in
Re: PySpark Nested Json Parsing
Before using the json file as text file, can you make sure that each json string can fit in one line? Because textFile() will split the file by '\n' On Mon, Jul 20, 2015 at 3:26 AM, Ajay ajay0...@gmail.com wrote: Hi, I am new to Apache Spark. I am trying to parse nested json using pyspark. Here is the code by which I am trying to parse Json. I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2. lines = sc.textFile(inputFile) import json def func(x): json_str = json.loads(x) if json_str['label']: if json_str['label']['label2']: return (1,1) return (0,1) lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile) I am getting following error, ERROR [Executor task launch worker-13] executor.Executor (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 107, in main process() File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 247, in func return f(iterator) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 1561, in combineLocally merger.mergeValues(iterator) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py, line 252, in mergeValues for k, v in iterator: File stdin, line 2, in func File /usr/lib64/python2.6/json/__init__.py, line 307, in loads return _default_decoder.decode(s) File /usr/lib64/python2.6/json/decoder.py, line 319, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File /usr/lib64/python2.6/json/decoder.py, line 336, in raw_decode obj, end = self._scanner.iterscan(s, **kw).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject value, end = iterscan(s, idx=end, context=context).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject value, end = iterscan(s, idx=end, context=context).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 155, in JSONString return scanstring(match.string, match.end(), encoding, strict) ValueError: Invalid \escape: line 1 column 855 (char 855) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12] executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in stage 14.0 (TID 24) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 107, in main process() File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File
Re: PySpark Nested Json Parsing
I had the similar issue with spark 1.3 After migrating to Spark 1.4 and using sqlcontext.read.json it worked well I think you can look at dataframe select and explode options to read the nested json elements, array etc. Thanks. On Mon, Jul 20, 2015 at 11:07 AM, Davies Liu dav...@databricks.com wrote: Could you try SQLContext.read.json()? On Mon, Jul 20, 2015 at 9:06 AM, Davies Liu dav...@databricks.com wrote: Before using the json file as text file, can you make sure that each json string can fit in one line? Because textFile() will split the file by '\n' On Mon, Jul 20, 2015 at 3:26 AM, Ajay ajay0...@gmail.com wrote: Hi, I am new to Apache Spark. I am trying to parse nested json using pyspark. Here is the code by which I am trying to parse Json. I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2. lines = sc.textFile(inputFile) import json def func(x): json_str = json.loads(x) if json_str['label']: if json_str['label']['label2']: return (1,1) return (0,1) lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile) I am getting following error, ERROR [Executor task launch worker-13] executor.Executor (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 107, in main process() File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 247, in func return f(iterator) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 1561, in combineLocally merger.mergeValues(iterator) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py, line 252, in mergeValues for k, v in iterator: File stdin, line 2, in func File /usr/lib64/python2.6/json/__init__.py, line 307, in loads return _default_decoder.decode(s) File /usr/lib64/python2.6/json/decoder.py, line 319, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File /usr/lib64/python2.6/json/decoder.py, line 336, in raw_decode obj, end = self._scanner.iterscan(s, **kw).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject value, end = iterscan(s, idx=end, context=context).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject value, end = iterscan(s, idx=end, context=context).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 155, in JSONString return scanstring(match.string, match.end(), encoding, strict) ValueError: Invalid \escape: line 1 column 855 (char 855) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12] executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in stage 14.0 (TID 24) org.apache.spark.api.python.PythonException: Traceback (most recent call