RE: Best way to read XML data from RDD

2016-08-22 Thread Puneet Tripathi
I was building a small app to stream messages from kafka via spark. The message 
was an xml, every message is a new xml. I wrote a simple app to do so[ this app 
expects the xml to be a single line]

from __future__ import print_function
from pyspark.sql import Row
import xml.etree.ElementTree as ET
import sys
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

## This is where you parse the XML
dict ={}

def create_dict(rt,new=None):
global parent_tag

for child in rt:
if new == None :
parent_tag = child.tag
else :
parent_tag = parent_tag

if child.getchildren():
# if child.tag in dict.keys():
# tag = tag + child.tag

# else:
# tag=child.tag
return dict

def parse_xml_to_row(xmlString):
root = ET.fromstring(xmlString.encode('utf-8'))
dct = create_dict(root)
return Row(**dct)

def toCSVLine(data):
return ','.join(str(d) for d in data)

## Parsing code part ends here

# Configure Spark
conf = SparkConf().setAppName("PythonStreamingKafkaWordCount")
conf = conf.setMaster("local[*]")
sc   = SparkContext(conf=conf)

ssc = StreamingContext(sc, 10)

zkQuorum, topic = 'localhost:2182', 'topic-name'
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", 
{topic: 1})
lines = x: x[1]).map(parse_xml_to_row).map(toCSVLine)
# lines.pprint()
lines.saveAsTextFiles('where you want to write the file ')


Hope this is helpful.


From: Hyukjin Kwon []
Sent: Monday, August 22, 2016 4:34 PM
To: Diwakar Dhanuskodi
Cc: Darin McBeath; Jörn Franke; Felix Cheung; user
Subject: Re: Best way to read XML data from RDD

Do you mind share your codes and sample data? It should be okay with single XML 
if I remember this correctly.

2016-08-22 19:53 GMT+09:00 Diwakar Dhanuskodi 
Hi Darin,

Ate  you  using  this  utility  to  parse single line XML?

Sent from Samsung Mobile.

 Original message 
From: Darin McBeath >
Date:21/08/2016 17:44 (GMT+05:30)
To: Hyukjin Kwon >, Jörn Franke 
Cc: Diwakar Dhanuskodi 
>, Felix 
Cheung >, user 
Subject: Re: Best way to read XML data from RDD

Another option would be to look at spark-xml-utils.  We use this extensively in 
the manipulation of our XML content.

There are quite a few examples.  Depending on your preference (and what you 
want to do), you could use xpath, xquery, or xslt to transform, extract, or 

Like mentioned below, you want to initialize the parser in a mapPartitions call 
(one of the examples shows this).

Hope this is helpful.


From: Hyukjin Kwon >
To: Jörn Franke >
Cc: Diwakar Dhanuskodi 
>; Felix 
Cheung >; user 
Sent: Sunday, August 21, 2016 6:10 AM
Subject: Re: Best way to read XML data from RDD

Hi Diwakar,

Spark XML library can take RDD as source.

val df = new XmlReader()
  .xmlRdd(sqlContext, rdd)

If performance is critical, I would also recommend to take care of creation and 
destruction of the parser.

If the parser is not serializble, then you can do the creation for each 
partition within mapPartition just like

I hope this is helpful.

2016-08-20 15:10 GMT+09:00 Jörn Franke 

I fear the issue is that this will create and destroy a XML parser object 2 mio 
times, which is very inefficient - it does not really look like a parser 
performance issue. Can't you do something about the format choice? Ask your 
supplier to deliver another format (ideally avro or sth like this?)?
>Otherwise you could just create one XML Parser object / node, but sharing this 
>among the parallel tasks on the same node is tricky.
>The other possibility could be simply more 

writing Kafka dstream to local flat file

2016-07-21 Thread Puneet Tripathi
Hi, I am trying to consume from Kafka topics following Approach 
one(createStream). I am not able to write it to local text file using 
saveAsTextFiles() function. Below is the code

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, 1)
zkQuorum, topic = 'localhost:9092', 'python-kafka'

kafka_stream = KafkaUtils.createStream(ssc, zkQuorum,None, {topic: 1})
lines = x: x[1])


When I access the consumer I get following output

[puneett@gb-slo-svb-0255 ~]$ 
/nfs/science/shared/kafka/kafka/bin/ --topic 
python-kafka --property schema.registry.url="http://localhost:9092; --zookeeper 
localhost:2182 --from-beginning
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
SLF4J: Found binding in 
SLF4J: Found binding in 
SLF4J: Found binding in 
SLF4J: Found binding in 
SLF4J: Found binding in 
SLF4J: See for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
There are 1 more similar kafka test message produced
There are 1 more similar kafka test message produced
There are 1 more similar kafka test message produced
There are 1 more similar kafka test message produced
There are 1 more similar kafka test message produced
There are 1 more similar kafka test message produced
There are 1 more similar kafka test message produced

Please can someone suggest what am I doing wrong?
RE: Spark with HBase Error - Py4JJavaError

2016-07-08 Thread Puneet Tripathi
Hi Ram, Thanks very much it worked.


From: ram kumar []
Sent: Thursday, July 07, 2016 6:51 PM
To: Puneet Tripathi
Subject: Re: Spark with HBase Error - Py4JJavaError

Hi Puneet,
Have you tried appending
 --jars $SPARK_HOME/lib/spark-examples-*.jar
to the execution command?

On Thu, Jul 7, 2016 at 5:19 PM, Puneet Tripathi 
<<>> wrote:
Guys, Please can anyone help on the issue below?


From: Puneet Tripathi 
Sent: Thursday, July 07, 2016 12:42 PM
Subject: Spark with HBase Error - Py4JJavaError


We are running Hbase in fully distributed mode. I tried to connect to Hbase via 
pyspark and then write to hbase using saveAsNewAPIHadoopDataset , but it failed 
the error says:

Py4JJavaError: An error occurred while calling 
: java.lang.ClassNotFoundException: 
I have been able to create pythonconverters.jar and then did below:

1.  I think we have to copy this to a location on HDFS, /sparkjars/ seems a 
good a directory to create as any. I think the file has to be world readable

2.  Set the spark_jar_hdfs_path property in Cloudera Manager e.g. 

It still doesn’t seem to work can someone please help me with this.

RE: Spark with HBase Error - Py4JJavaError

2016-07-07 Thread Puneet Tripathi
Guys, Please can anyone help on the issue below?


From: Puneet Tripathi []
Sent: Thursday, July 07, 2016 12:42 PM
Subject: Spark with HBase Error - Py4JJavaError


We are running Hbase in fully distributed mode. I tried to connect to Hbase via 
pyspark and then write to hbase using saveAsNewAPIHadoopDataset , but it failed 
the error says:

Py4JJavaError: An error occurred while calling 
: java.lang.ClassNotFoundException: 
I have been able to create pythonconverters.jar and then did below:

1.  I think we have to copy this to a location on HDFS, /sparkjars/ seems a 
good a directory to create as any. I think the file has to be world readable

2.  Set the spark_jar_hdfs_path property in Cloudera Manager e.g. 

It still doesn't seem to work can someone please help me with this.

Spark with HBase Error - Py4JJavaError

2016-07-07 Thread Puneet Tripathi

We are running Hbase in fully distributed mode. I tried to connect to Hbase via 
pyspark and then write to hbase using saveAsNewAPIHadoopDataset , but it failed 
the error says:

Py4JJavaError: An error occurred while calling 
: java.lang.ClassNotFoundException: 
I have been able to create pythonconverters.jar and then did below:

1.  I think we have to copy this to a location on HDFS, /sparkjars/ seems a 
good a directory to create as any. I think the file has to be world readable

2.  Set the spark_jar_hdfs_path property in Cloudera Manager e.g. 

It still doesn't seem to work can someone please help me with this.

