PySpark Nested Json Parsing

2015-07-20 Thread Ajay
Hi,

I am new to Apache Spark. I am trying to parse nested json using pyspark.
Here is the code by which I am trying to parse Json.
I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2.

lines = sc.textFile(inputFile)

import json
def func(x):
json_str = json.loads(x)
if json_str['label']:
if json_str['label']['label2']:
return (1,1)
return (0,1)

lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile)

I am getting following error,
ERROR [Executor task launch worker-13] executor.Executor
(Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25)
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
line 107, in main
process()
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
line 98, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
line 2073, in pipeline_func
return func(split, prev_func(split, iterator))
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
line 2073, in pipeline_func
return func(split, prev_func(split, iterator))
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
line 247, in func
return f(iterator)
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
line 1561, in combineLocally
merger.mergeValues(iterator)
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py,
line 252, in mergeValues
for k, v in iterator:
  File stdin, line 2, in func
  File /usr/lib64/python2.6/json/__init__.py, line 307, in loads
return _default_decoder.decode(s)
  File /usr/lib64/python2.6/json/decoder.py, line 319, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File /usr/lib64/python2.6/json/decoder.py, line 336, in raw_decode
obj, end = self._scanner.iterscan(s, **kw).next()
  File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
rval, next_pos = action(m, context)
  File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
value, end = iterscan(s, idx=end, context=context).next()
  File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
rval, next_pos = action(m, context)
  File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
value, end = iterscan(s, idx=end, context=context).next()
  File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
rval, next_pos = action(m, context)
  File /usr/lib64/python2.6/json/decoder.py, line 155, in JSONString
return scanstring(match.string, match.end(), encoding, strict)
ValueError: Invalid \escape: line 1 column 855 (char 855)

at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12]
executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in
stage 14.0 (TID 24)
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
line 107, in main
process()
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
line 98, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
line 2073, in pipeline_func
return func(split, prev_func(split, iterator))
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
line 2073, in pipeline_func
return func(split, prev_func(split, iterator))
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
line 247, in func
return f(iterator)
  File

Re: PySpark Nested Json Parsing

2015-07-20 Thread Davies Liu
Could you try SQLContext.read.json()?

On Mon, Jul 20, 2015 at 9:06 AM, Davies Liu dav...@databricks.com wrote:
 Before using the json file as text file, can you make sure that each
 json string can fit in one line? Because textFile() will split the
 file by '\n'

 On Mon, Jul 20, 2015 at 3:26 AM, Ajay ajay0...@gmail.com wrote:
 Hi,

 I am new to Apache Spark. I am trying to parse nested json using pyspark.
 Here is the code by which I am trying to parse Json.
 I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2.

 lines = sc.textFile(inputFile)

 import json
 def func(x):
 json_str = json.loads(x)
 if json_str['label']:
 if json_str['label']['label2']:
 return (1,1)
 return (0,1)

 lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile)

 I am getting following error,
 ERROR [Executor task launch worker-13] executor.Executor
 (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25)
 org.apache.spark.api.python.PythonException: Traceback (most recent call
 last):
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 107, in main
 process()
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 98, in process
 serializer.dump_stream(func(split_index, iterator), outfile)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 2073, in pipeline_func
 return func(split, prev_func(split, iterator))
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 2073, in pipeline_func
 return func(split, prev_func(split, iterator))
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 247, in func
 return f(iterator)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 1561, in combineLocally
 merger.mergeValues(iterator)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py,
 line 252, in mergeValues
 for k, v in iterator:
   File stdin, line 2, in func
   File /usr/lib64/python2.6/json/__init__.py, line 307, in loads
 return _default_decoder.decode(s)
   File /usr/lib64/python2.6/json/decoder.py, line 319, in decode
 obj, end = self.raw_decode(s, idx=_w(s, 0).end())
   File /usr/lib64/python2.6/json/decoder.py, line 336, in raw_decode
 obj, end = self._scanner.iterscan(s, **kw).next()
   File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
 rval, next_pos = action(m, context)
   File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
 value, end = iterscan(s, idx=end, context=context).next()
   File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
 rval, next_pos = action(m, context)
   File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
 value, end = iterscan(s, idx=end, context=context).next()
   File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
 rval, next_pos = action(m, context)
   File /usr/lib64/python2.6/json/decoder.py, line 155, in JSONString
 return scanstring(match.string, match.end(), encoding, strict)
 ValueError: Invalid \escape: line 1 column 855 (char 855)

 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
 at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12]
 executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in
 stage 14.0 (TID 24)
 org.apache.spark.api.python.PythonException: Traceback (most recent call
 last):
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 107, in main
 process()
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 98, in process
 serializer.dump_stream(func(split_index, iterator), outfile)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 2073, in 

Re: PySpark Nested Json Parsing

2015-07-20 Thread Davies Liu
Before using the json file as text file, can you make sure that each
json string can fit in one line? Because textFile() will split the
file by '\n'

On Mon, Jul 20, 2015 at 3:26 AM, Ajay ajay0...@gmail.com wrote:
 Hi,

 I am new to Apache Spark. I am trying to parse nested json using pyspark.
 Here is the code by which I am trying to parse Json.
 I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2.

 lines = sc.textFile(inputFile)

 import json
 def func(x):
 json_str = json.loads(x)
 if json_str['label']:
 if json_str['label']['label2']:
 return (1,1)
 return (0,1)

 lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile)

 I am getting following error,
 ERROR [Executor task launch worker-13] executor.Executor
 (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25)
 org.apache.spark.api.python.PythonException: Traceback (most recent call
 last):
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 107, in main
 process()
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 98, in process
 serializer.dump_stream(func(split_index, iterator), outfile)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 2073, in pipeline_func
 return func(split, prev_func(split, iterator))
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 2073, in pipeline_func
 return func(split, prev_func(split, iterator))
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 247, in func
 return f(iterator)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 1561, in combineLocally
 merger.mergeValues(iterator)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py,
 line 252, in mergeValues
 for k, v in iterator:
   File stdin, line 2, in func
   File /usr/lib64/python2.6/json/__init__.py, line 307, in loads
 return _default_decoder.decode(s)
   File /usr/lib64/python2.6/json/decoder.py, line 319, in decode
 obj, end = self.raw_decode(s, idx=_w(s, 0).end())
   File /usr/lib64/python2.6/json/decoder.py, line 336, in raw_decode
 obj, end = self._scanner.iterscan(s, **kw).next()
   File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
 rval, next_pos = action(m, context)
   File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
 value, end = iterscan(s, idx=end, context=context).next()
   File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
 rval, next_pos = action(m, context)
   File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
 value, end = iterscan(s, idx=end, context=context).next()
   File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
 rval, next_pos = action(m, context)
   File /usr/lib64/python2.6/json/decoder.py, line 155, in JSONString
 return scanstring(match.string, match.end(), encoding, strict)
 ValueError: Invalid \escape: line 1 column 855 (char 855)

 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
 at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12]
 executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in
 stage 14.0 (TID 24)
 org.apache.spark.api.python.PythonException: Traceback (most recent call
 last):
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 107, in main
 process()
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 98, in process
 serializer.dump_stream(func(split_index, iterator), outfile)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 2073, in pipeline_func
 return func(split, prev_func(split, iterator))
   File
 

Re: PySpark Nested Json Parsing

2015-07-20 Thread Naveen Madhire
I had the similar issue with spark 1.3
After migrating to Spark 1.4 and using sqlcontext.read.json it worked well
I think you can look at dataframe select and explode options to read the
nested json elements, array etc.

Thanks.


On Mon, Jul 20, 2015 at 11:07 AM, Davies Liu dav...@databricks.com wrote:

 Could you try SQLContext.read.json()?

 On Mon, Jul 20, 2015 at 9:06 AM, Davies Liu dav...@databricks.com wrote:
  Before using the json file as text file, can you make sure that each
  json string can fit in one line? Because textFile() will split the
  file by '\n'
 
  On Mon, Jul 20, 2015 at 3:26 AM, Ajay ajay0...@gmail.com wrote:
  Hi,
 
  I am new to Apache Spark. I am trying to parse nested json using
 pyspark.
  Here is the code by which I am trying to parse Json.
  I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2.
 
  lines = sc.textFile(inputFile)
 
  import json
  def func(x):
  json_str = json.loads(x)
  if json_str['label']:
  if json_str['label']['label2']:
  return (1,1)
  return (0,1)
 
  lines.map(func).reduceByKey(lambda a,b: a +
 b).saveAsTextFile(outputFile)
 
  I am getting following error,
  ERROR [Executor task launch worker-13] executor.Executor
  (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID
 25)
  org.apache.spark.api.python.PythonException: Traceback (most recent call
  last):
File
 
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
  line 107, in main
  process()
File
 
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
  line 98, in process
  serializer.dump_stream(func(split_index, iterator), outfile)
File
 
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
  line 2073, in pipeline_func
  return func(split, prev_func(split, iterator))
File
 
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
  line 2073, in pipeline_func
  return func(split, prev_func(split, iterator))
File
 
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
  line 247, in func
  return f(iterator)
File
 
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
  line 1561, in combineLocally
  merger.mergeValues(iterator)
File
 
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py,
  line 252, in mergeValues
  for k, v in iterator:
File stdin, line 2, in func
File /usr/lib64/python2.6/json/__init__.py, line 307, in loads
  return _default_decoder.decode(s)
File /usr/lib64/python2.6/json/decoder.py, line 319, in decode
  obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File /usr/lib64/python2.6/json/decoder.py, line 336, in raw_decode
  obj, end = self._scanner.iterscan(s, **kw).next()
File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
  rval, next_pos = action(m, context)
File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
  value, end = iterscan(s, idx=end, context=context).next()
File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
  rval, next_pos = action(m, context)
File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
  value, end = iterscan(s, idx=end, context=context).next()
File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
  rval, next_pos = action(m, context)
File /usr/lib64/python2.6/json/decoder.py, line 155, in JSONString
  return scanstring(match.string, match.end(), encoding, strict)
  ValueError: Invalid \escape: line 1 column 855 (char 855)
 
  at
 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
  at
 org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174)
  at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
  at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
  at
 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
  at
 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:56)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
  at
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
  2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12]
  executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0
 in
  stage 14.0 (TID 24)
  org.apache.spark.api.python.PythonException: Traceback (most recent call