Hi Zhang & All,

Thanks so much for your earlier response. I am trying to get final
solution. We could parse the data successfully however I am getting
Nullpointerexception while mapping it back. Can you please suggest on
below findings ?

spark-shell --packages com.databricks:spark-xml_2.11:0.4.1
//----------------------------------
scala> spark.version
res19: String = 2.0.0

scala >
def xml2DF (xml:String) = {
     | val xRDD = sc.parallelize(Seq(xml))
     | val df = new XmlReader().xmlRdd(spark.sqlContext,xRDD)
     | val new_df = df.withColumn("comment",
explode(df("Comments.Comment"))).select($"comment.Description",$"comment.Title")
     | new_df.collectAsList}
xml2DF: (xml: String)java.util.List[org.apache.spark.sql.Row]
//-----------------------------------
scala> val 
xml3="<books><Comments><Comment><Title>Title3.1</Title><Description>Descript3.1</Description></Comment><Comment><Title>Title3333.1</Title><Description>Descript3333.1</Description></Comment></Comments></books>"

scala> xml2DF(xml3)
res18: java.util.List[org.apache.spark.sql.Row] =
[[Descript3.1,Title3.1], [Descript3333.1,Title3333.1]]
//xml2DF works as expected
//-----------------------------------
val rdd = sc.textFile("file:///home/spark/XML_Project/data.txt")
val xml_pRDDs = rdd.map(x=>(x.split(',')(0).toInt, x.split(',')(3)))
scala> xml_pRDDs.map(x=>(x._1,"call xml2DF "+x._2)).collect
res17: Array[(Int, String)] = Array((1,call xml2DF
<books><Comments><Comment><Title>Title1.1</Title><Description>Descript1.1</D..
//-----------------------------------
xml_pRDDs.map(x=>(x._1,xml2DF(x._2))).collect
//This gives Nullpointerexception, I was expecting result as below
//Array[(Int, java.util.List[org.apache.spark.sql.Row])] =
Array((1,[[Descript3.1,Title3.1], [Descript3333.1,Title3333.1]]),....
====================
Below is content of data.txt file.
1,Amol,Kolhapur,<books><Comments><Comment><Title>Title1.1</Title><Description>Descript1.1</Description></Comment><Comment><Title>Title1.2</Title><Description>Descript1.2</Description></Comment><Comment><Title>Title1.3</Title><Description>Descript1.3</Description></Comment></Comments></books>
2,Ameet,Bangalore,<books><Comments><Comment><Title>Title2.1</Title><Description>Descript2.1</Description></Comment><Comment><Title>Title2.2</Title><Description>Descript2.2</Description></Comment></Comments></books>
3,Rajesh,Jaipur,<books><Comments><Comment><Title>Title3.1</Title><Description>Descript3.1</Description></Comment><Comment><Title>Title3.2</Title><Description>Descript3.2</Description></Comment><Comment><Title>Title3.3</Title><Description>Descript3.3</Description></Comment><Comment><Title>Title3.4</Title><Description>Descript3.4</Description></Comment></Comments></books>

Regards,
Amol



On 6/29/17, Yong Zhang <java8...@hotmail.com> wrote:
> scala>spark.version
> res6: String = 2.1.1
>
> scala>val rdd  =
> sc.parallelize(Seq("""<books><Comments><Comment><Title>Title1.1</Title><Description>Description_1.1</Description></Comment>
> <Comment><Title>Title1.2</Title><Description>Description_1.2</Description></Comment>
> <Comment><Title>Title1.3</Title><Description>Description_1.3</Description></Comment>
> </Comments></books>"""))
> rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
> parallelize at <console>:24
>
> scala>import com.databricks.spark.xml.XmlReader
>
> scala>val df = new XmlReader().xmlRdd(spark.sqlContext, rdd)
> df: org.apache.spark.sql.DataFrame = [Comments: struct<Comment:
> array<struct<Description:string,Title:string>>>]
>
> scala>df.printSchema
> root
>  |-- Comments: struct (nullable = true)
>  |    |-- Comment: array (nullable = true)
>  |    |    |-- element: struct (containsNull = true)
>  |    |    |    |-- Description: string (nullable = true)
>  |    |    |    |-- Title: string (nullable = true)
>
> scala>df.show(false)
> +--------------------------------------------------------------------------------------------------+
> |Comments
>                       |
> +--------------------------------------------------------------------------------------------------+
> |[WrappedArray([Description_1.1,Title1.1], [Description_1.2,Title1.2],
> [Description_1.3,Title1.3])]|
> +--------------------------------------------------------------------------------------------------+
>
>
> scala>df.withColumn("comment",
> explode(df("Comments.Comment"))).select($"comment.Description",
> $"comment.Title").show
> +---------------+--------+
> |    Description|   Title|
> +---------------+--------+
> |Description_1.1|Title1.1|
> |Description_1.2|Title1.2|
> |Description_1.3|Title1.3|
> +---------------+--------+
>
>
>
> ________________________________
> From: Talap, Amol <amol.ta...@capgemini.com>
> Sent: Thursday, June 29, 2017 9:38 AM
> To: Judit Planas; user@spark.apache.org
> Subject: RE: SparkSQL to read XML Blob data to create multiple rows
>
>
> Thanks Judit, Ayan
>
> Judit,
>
> You almost got it. The explode might help here.
>
> But when I tried I see load() doesn’t like to read from xmlcomment column on
> oracle_data.
>
>
>
> scala> val xmlDF = sqlContext.sql("SELECT * FROM oracle_data")
>
> 17/06/29 18:31:58 INFO parse.ParseDriver: Parsing command: SELECT * FROM
> oracle_data
>
> 17/06/29 18:31:58 INFO parse.ParseDriver: Parse Completed
>
> …
>
> scala> val xmlDF_flattened =
> xmlDF.withColumn("xmlcomment",explode(sqlContext.read.format("com.databricks.spark.xml").option("rowTag","book").load($"xmlcomment")))
>
> <console>:22: error: overloaded method value load with alternatives:
>
>   ()org.apache.spark.sql.DataFrame <and>
>
>   (path: String)org.apache.spark.sql.DataFrame
>
> cannot be applied to (org.apache.spark.sql.ColumnName)
>
>        val xmlDF_flattened =
> xmlDF.withColumn("xmlcomment",explode(sqlContext.read.format("com.databricks.spark.xml").option("rowTag","book").load($"xmlcomment")))
>
>
>
> Ayan,
>
> Output of books_inexp.show was as below
> title,                             author
> Midnight Rain,            Ralls, Kim
> Maeve Ascendant,      Corets, Eva
>
>
>
> Regards,
>
> Amol
>
> From: Judit Planas [mailto:judit.pla...@epfl.ch]
> Sent: Thursday, June 29, 2017 3:46 AM
> To: user@spark.apache.org
> Subject: Re: SparkSQL to read XML Blob data to create multiple rows
>
>
>
> Hi Amol,
>
> Not sure I understand completely your question, but the SQL function
> "explode" may help you:
> http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.explode
>
> pyspark.sql module — PySpark 2.1.1
> documentation<http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.explode>
> spark.apache.org
> pyspark.sql.SparkSession Main entry point for DataFrame and SQL
> functionality. pyspark.sql.DataFrame A distributed collection of data
> grouped into named columns.
>
>
>
>
> Here you can find a nice example:
> https://stackoverflow.com/questions/38210507/explode-in-pyspark
> [https://cdn.sstatic.net/Sites/stackoverflow/img/apple-touch-i...@2.png?v=73d79a89bded]<https://stackoverflow.com/questions/38210507/explode-in-pyspark>
>
> python - Explode in PySpark - Stack
> Overflow<https://stackoverflow.com/questions/38210507/explode-in-pyspark>
> stackoverflow.com
> I would like to transform from a DataFrame that contains lists of words into
> a DataFrame with each word in its own row. How do I do explode on a column
> in a DataFrame?
>
>
>
>
> HTH,
> Judit
>
> On 29/06/17 09:05, ayan guha wrote:
>
> Hi
>
>
>
> Not sure if I follow your issue. Can you please post output of
> books_inexp.show()?
>
>
>
> On Thu, Jun 29, 2017 at 2:30 PM, Talap, Amol
> <amol.ta...@capgemini.com<mailto:amol.ta...@capgemini.com>> wrote:
>
> Hi:
>
>
>
> We are trying to parse XML data to get below output from given input
> sample.
>
> Can someone suggest a way to pass one DFrames output into load() function or
> any other alternative to get this output.
>
>
>
> Input Data from Oracle Table XMLBlob:
>
> SequenceID
>
>
> Name
>
>
> City
>
>
> XMLComment
>
>
> 1
>
>
> Amol
>
>
> Kolhapur
>
>
> <books><Comments><Comment><Title>Title1.1</Title><Description>Description_1.1</Description><Comment><Title>Title1.2</Title><Description>Description_1.2</Description><Comment><Title>Title1.3</Title><Description>Description_1.3</Description></Comment></Comments></books>
>
>
> 2
>
>
> Suresh
>
>
> Mumbai
>
>
> <books><Comments><Comment><Title>Title2</Title><Description>Description_2</Description></Comment></Comments></books>
>
>
> 3
>
>
> Vishal
>
>
> Delhi
>
>
> <books><Comments><Comment><Title>Title3</Title><Description>Description_3</Description></Comment></Comments></books>
>
>
> 4
>
>
> Swastik
>
>
> Bangalore
>
>
> <books><Comments><Comment><Title>Title4</Title><Description>Description_4</Description></Comment></Comments></books>
>
>
>
>
> Output Data Expected using Spark SQL:
>
> SequenceID
>
>
> Name
>
>
> City
>
>
> Title
>
>
> Description
>
>
> 1
>
>
> Amol
>
>
> Kolhapur
>
>
> Title1.1
>
>
> Description_1.1
>
>
> 1
>
>
> Amol
>
>
> Kolhapur
>
>
> Title1.1
>
>
> Description_1.2
>
>
> 1
>
>
> Amol
>
>
> Kolhapur
>
>
> Title1.3
>
>
> Description_1.3
>
>
> 2
>
>
> Suresh
>
>
> Mumbai
>
>
> Title2
>
>
> Description_2
>
>
> 3
>
>
> Vishal
>
>
> Delhi
>
>
> Title3.1
>
>
> Description_3.1
>
>
> 4
>
>
> Swastik
>
>
> Bangalore
>
>
> Title4
>
>
> Description_4
>
>
>
>
> I am able to parse single XML using below approach in spark-shell using
> example below but how do we apply the same recursively for all rows ?
>
> https://community.hortonworks.com/questions/71538/parsing-xml-in-spark-rdd.html.
>
> val dfX =
> sqlContext.read.format("com.databricks.spark.xml").option("rowTag","book").load("books.xml")
>
>
> val xData = dfX.registerTempTable("books")
>
>
> dfX.printSchema()
>
>
> val books_inexp =sqlContext.sql("select title,author from books where
> price<10")
>
>
> books_inexp.show
>
>
>
>
> Regards,
>
> Amol
>
> This message contains information that may be privileged or confidential and
> is the property of the Capgemini Group. It is intended only for the person
> to whom it is addressed. If you are not the intended recipient, you are not
> authorized to read, print, retain, copy, disseminate, distribute, or use
> this message or any part thereof. If you receive this message in error,
> please notify the sender immediately and delete all copies of this message.
>
>
>
>
>
> --
>
> Best Regards,
> Ayan Guha
>
>
>
> This message contains information that may be privileged or confidential and
> is the property of the Capgemini Group. It is intended only for the person
> to whom it is addressed. If you are not the intended recipient, you are not
> authorized to read, print, retain, copy, disseminate, distribute, or use
> this message or any part thereof. If you receive this message in error,
> please notify the sender immediately and delete all copies of this message.
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to