RE: Best way to read XML data from RDD
I was building a small app to stream messages from kafka via spark. The message was an xml, every message is a new xml. I wrote a simple app to do so[ this app expects the xml to be a single line] from __future__ import print_function from pyspark.sql import Row import xml.etree.ElementTree as ET import sys from pyspark import SparkContext from pyspark import SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils ## This is where you parse the XML dict ={} def create_dict(rt,new=None): global parent_tag for child in rt: if new == None : parent_tag = child.tag else : parent_tag = parent_tag if child.getchildren(): create_dict(child,parent_tag) else: # if child.tag in dict.keys(): # tag = tag + child.tag # else: # tag=child.tag dict[parent_tag]=child.text return dict def parse_xml_to_row(xmlString): dct={} root = ET.fromstring(xmlString.encode('utf-8')) dct = create_dict(root) return Row(**dct) def toCSVLine(data): return ','.join(str(d) for d in data) ## Parsing code part ends here #sc.stop() # Configure Spark conf = SparkConf().setAppName("PythonStreamingKafkaWordCount") conf = conf.setMaster("local[*]") sc = SparkContext(conf=conf) sc.setLogLevel("WARN") ssc = StreamingContext(sc, 10) zkQuorum, topic = 'localhost:2182', 'topic-name' kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1}) lines = kvs.map(lambda x: x[1]).map(parse_xml_to_row).map(toCSVLine) # lines.pprint() lines.saveAsTextFiles('where you want to write the file ') ssc.start() ssc.awaitTerminationOrTimeout(50) ssc.stop() Hope this is helpful. Puneet From: Hyukjin Kwon [mailto:gurwls...@gmail.com] Sent: Monday, August 22, 2016 4:34 PM To: Diwakar Dhanuskodi Cc: Darin McBeath; Jörn Franke; Felix Cheung; user Subject: Re: Best way to read XML data from RDD Do you mind share your codes and sample data? It should be okay with single XML if I remember this correctly. 2016-08-22 19:53 GMT+09:00 Diwakar Dhanuskodi>: Hi Darin, Ate you using this utility to parse single line XML? Sent from Samsung Mobile. Original message From: Darin McBeath > Date:21/08/2016 17:44 (GMT+05:30) To: Hyukjin Kwon >, Jörn Franke > Cc: Diwakar Dhanuskodi >, Felix Cheung >, user > Subject: Re: Best way to read XML data from RDD Another option would be to look at spark-xml-utils. We use this extensively in the manipulation of our XML content. https://github.com/elsevierlabs-os/spark-xml-utils There are quite a few examples. Depending on your preference (and what you want to do), you could use xpath, xquery, or xslt to transform, extract, or filter. Like mentioned below, you want to initialize the parser in a mapPartitions call (one of the examples shows this). Hope this is helpful. Darin. From: Hyukjin Kwon > To: Jörn Franke > Cc: Diwakar Dhanuskodi >; Felix Cheung >; user > Sent: Sunday, August 21, 2016 6:10 AM Subject: Re: Best way to read XML data from RDD Hi Diwakar, Spark XML library can take RDD as source. ``` val df = new XmlReader() .withRowTag("book") .xmlRdd(sqlContext, rdd) ``` If performance is critical, I would also recommend to take care of creation and destruction of the parser. If the parser is not serializble, then you can do the creation for each partition within mapPartition just like https://github.com/apache/spark/blob/ac84fb64dd85257da06f93a48fed9bb188140423/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L322-L325 I hope this is helpful. 2016-08-20 15:10 GMT+09:00 Jörn Franke >: I fear the issue is that this will create and destroy a XML parser object 2 mio times, which is very inefficient - it does not really look like a parser performance issue. Can't you do something about the format choice? Ask your supplier to deliver another format (ideally avro or sth like this?)? >Otherwise you could just create one XML Parser object / node, but sharing this >among the parallel tasks on the same node is tricky. >The other possibility could be simply more
writing Kafka dstream to local flat file
Hi, I am trying to consume from Kafka topics following http://spark.apache.org/docs/latest/streaming-kafka-integration.html Approach one(createStream). I am not able to write it to local text file using saveAsTextFiles() function. Below is the code import pyspark from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext from pyspark.streaming.kafka import KafkaUtils from pyspark.streaming import StreamingContext ssc = StreamingContext(sc, 1) zkQuorum, topic = 'localhost:9092', 'python-kafka' kafka_stream = KafkaUtils.createStream(ssc, zkQuorum,None, {topic: 1}) lines = kafka_stream.map(lambda x: x[1]) kafka_stream.saveAsTextFiles('file:///home/puneett/') When I access the consumer I get following output [puneett@gb-slo-svb-0255 ~]$ /nfs/science/shared/kafka/kafka/bin/kafka-console-consumer.sh --topic python-kafka --property schema.registry.url="http://localhost:9092; --zookeeper localhost:2182 --from-beginning SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/nfs/science/shared/kafka/kafka/core/build/dependant-libs-2.10.6/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/nfs/science/shared/kafka/kafka/tools/build/dependant-libs-2.10.6/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/nfs/science/shared/kafka/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/nfs/science/shared/kafka/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/nfs/science/shared/kafka/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/nfs/science/shared/kafka/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] There are 1 more similar kafka test message produced There are 1 more similar kafka test message produced There are 1 more similar kafka test message produced There are 1 more similar kafka test message produced There are 1 more similar kafka test message produced There are 1 more similar kafka test message produced There are 1 more similar kafka test message produced There Please can someone suggest what am I doing wrong? Regards, Puneet dunnhumby limited is a limited company registered in England and Wales with registered number 02388853 and VAT registered number 927 5871 83. Our registered office is at Aurora House, 71-75 Uxbridge Road, London W5 5SL. The contents of this message and any attachments to it are confidential and may be legally privileged. If you have received this message in error you should delete it from your system immediately and advise the sender. dunnhumby may monitor and record all emails. The views expressed in this email are those of the sender and not those of dunnhumby.
RE: Spark with HBase Error - Py4JJavaError
Hi Ram, Thanks very much it worked. Puneet From: ram kumar [mailto:ramkumarro...@gmail.com] Sent: Thursday, July 07, 2016 6:51 PM To: Puneet Tripathi Cc: user@spark.apache.org Subject: Re: Spark with HBase Error - Py4JJavaError Hi Puneet, Have you tried appending --jars $SPARK_HOME/lib/spark-examples-*.jar to the execution command? Ram On Thu, Jul 7, 2016 at 5:19 PM, Puneet Tripathi <puneet.tripa...@dunnhumby.com<mailto:puneet.tripa...@dunnhumby.com>> wrote: Guys, Please can anyone help on the issue below? Puneet From: Puneet Tripathi [mailto:puneet.tripa...@dunnhumby.com<mailto:puneet.tripa...@dunnhumby.com>] Sent: Thursday, July 07, 2016 12:42 PM To: user@spark.apache.org<mailto:user@spark.apache.org> Subject: Spark with HBase Error - Py4JJavaError Hi, We are running Hbase in fully distributed mode. I tried to connect to Hbase via pyspark and then write to hbase using saveAsNewAPIHadoopDataset , but it failed the error says: Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset. : java.lang.ClassNotFoundException: org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter at java.net.URLClassLoader$1.run(URLClassLoader.java:366) I have been able to create pythonconverters.jar and then did below: 1. I think we have to copy this to a location on HDFS, /sparkjars/ seems a good a directory to create as any. I think the file has to be world readable 2. Set the spark_jar_hdfs_path property in Cloudera Manager e.g. hdfs:///sparkjars It still doesn’t seem to work can someone please help me with this. Regards, Puneet dunnhumby limited is a limited company registered in England and Wales with registered number 02388853 and VAT registered number 927 5871 83. Our registered office is at Aurora House, 71-75 Uxbridge Road, London W5 5SL. The contents of this message and any attachments to it are confidential and may be legally privileged. If you have received this message in error you should delete it from your system immediately and advise the sender. dunnhumby may monitor and record all emails. The views expressed in this email are those of the sender and not those of dunnhumby. dunnhumby limited is a limited company registered in England and Wales with registered number 02388853 and VAT registered number 927 5871 83. Our registered office is at Aurora House, 71-75 Uxbridge Road, London W5 5SL. The contents of this message and any attachments to it are confidential and may be legally privileged. If you have received this message in error you should delete it from your system immediately and advise the sender. dunnhumby may monitor and record all emails. The views expressed in this email are those of the sender and not those of dunnhumby. dunnhumby limited is a limited company registered in England and Wales with registered number 02388853 and VAT registered number 927 5871 83. Our registered office is at Aurora House, 71-75 Uxbridge Road, London W5 5SL. The contents of this message and any attachments to it are confidential and may be legally privileged. If you have received this message in error you should delete it from your system immediately and advise the sender. dunnhumby may monitor and record all emails. The views expressed in this email are those of the sender and not those of dunnhumby.
RE: Spark with HBase Error - Py4JJavaError
Guys, Please can anyone help on the issue below? Puneet From: Puneet Tripathi [mailto:puneet.tripa...@dunnhumby.com] Sent: Thursday, July 07, 2016 12:42 PM To: user@spark.apache.org Subject: Spark with HBase Error - Py4JJavaError Hi, We are running Hbase in fully distributed mode. I tried to connect to Hbase via pyspark and then write to hbase using saveAsNewAPIHadoopDataset , but it failed the error says: Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset. : java.lang.ClassNotFoundException: org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter at java.net.URLClassLoader$1.run(URLClassLoader.java:366) I have been able to create pythonconverters.jar and then did below: 1. I think we have to copy this to a location on HDFS, /sparkjars/ seems a good a directory to create as any. I think the file has to be world readable 2. Set the spark_jar_hdfs_path property in Cloudera Manager e.g. hdfs:///sparkjars It still doesn't seem to work can someone please help me with this. Regards, Puneet dunnhumby limited is a limited company registered in England and Wales with registered number 02388853 and VAT registered number 927 5871 83. Our registered office is at Aurora House, 71-75 Uxbridge Road, London W5 5SL. The contents of this message and any attachments to it are confidential and may be legally privileged. If you have received this message in error you should delete it from your system immediately and advise the sender. dunnhumby may monitor and record all emails. The views expressed in this email are those of the sender and not those of dunnhumby. dunnhumby limited is a limited company registered in England and Wales with registered number 02388853 and VAT registered number 927 5871 83. Our registered office is at Aurora House, 71-75 Uxbridge Road, London W5 5SL. The contents of this message and any attachments to it are confidential and may be legally privileged. If you have received this message in error you should delete it from your system immediately and advise the sender. dunnhumby may monitor and record all emails. The views expressed in this email are those of the sender and not those of dunnhumby.
Spark with HBase Error - Py4JJavaError
Hi, We are running Hbase in fully distributed mode. I tried to connect to Hbase via pyspark and then write to hbase using saveAsNewAPIHadoopDataset , but it failed the error says: Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset. : java.lang.ClassNotFoundException: org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter at java.net.URLClassLoader$1.run(URLClassLoader.java:366) I have been able to create pythonconverters.jar and then did below: 1. I think we have to copy this to a location on HDFS, /sparkjars/ seems a good a directory to create as any. I think the file has to be world readable 2. Set the spark_jar_hdfs_path property in Cloudera Manager e.g. hdfs:///sparkjars It still doesn't seem to work can someone please help me with this. Regards, Puneet dunnhumby limited is a limited company registered in England and Wales with registered number 02388853 and VAT registered number 927 5871 83. Our registered office is at Aurora House, 71-75 Uxbridge Road, London W5 5SL. The contents of this message and any attachments to it are confidential and may be legally privileged. If you have received this message in error you should delete it from your system immediately and advise the sender. dunnhumby may monitor and record all emails. The views expressed in this email are those of the sender and not those of dunnhumby.