Re: SparkSQL to read XML Blob data to create multiple rows

2017-07-08 Thread Amol Talap
Hi Zhang & All,

Thanks so much for your earlier response. I am trying to get final
solution. We could parse the data successfully however I am getting
Nullpointerexception while mapping it back. Can you please suggest on
below findings ?

spark-shell --packages com.databricks:spark-xml_2.11:0.4.1
//--
scala> spark.version
res19: String = 2.0.0

scala >
def xml2DF (xml:String) = {
 | val xRDD = sc.parallelize(Seq(xml))
 | val df = new XmlReader().xmlRdd(spark.sqlContext,xRDD)
 | val new_df = df.withColumn("comment",
explode(df("Comments.Comment"))).select($"comment.Description",$"comment.Title")
 | new_df.collectAsList}
xml2DF: (xml: String)java.util.List[org.apache.spark.sql.Row]
//---
scala> val 
xml3="Title3.1Descript3.1Title.1Descript.1"

scala> xml2DF(xml3)
res18: java.util.List[org.apache.spark.sql.Row] =
[[Descript3.1,Title3.1], [Descript.1,Title.1]]
//xml2DF works as expected
//---
val rdd = sc.textFile("file:///home/spark/XML_Project/data.txt")
val xml_pRDDs = rdd.map(x=>(x.split(',')(0).toInt, x.split(',')(3)))
scala> xml_pRDDs.map(x=>(x._1,"call xml2DF "+x._2)).collect
res17: Array[(Int, String)] = Array((1,call xml2DF
Title1.1Descript1.1(x._1,xml2DF(x._2))).collect
//This gives Nullpointerexception, I was expecting result as below
//Array[(Int, java.util.List[org.apache.spark.sql.Row])] =
Array((1,[[Descript3.1,Title3.1], [Descript.1,Title.1]]),

Below is content of data.txt file.
1,Amol,Kolhapur,Title1.1Descript1.1Title1.2Descript1.2Title1.3Descript1.3
2,Ameet,Bangalore,Title2.1Descript2.1Title2.2Descript2.2
3,Rajesh,Jaipur,Title3.1Descript3.1Title3.2Descript3.2Title3.3Descript3.3Title3.4Descript3.4

Regards,
Amol



On 6/29/17, Yong Zhang  wrote:
> scala>spark.version
> res6: String = 2.1.1
>
> scala>val rdd  =
> sc.parallelize(Seq("""Title1.1Description_1.1
> Title1.2Description_1.2
> Title1.3Description_1.3
> """))
> rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
> parallelize at :24
>
> scala>import com.databricks.spark.xml.XmlReader
>
> scala>val df = new XmlReader().xmlRdd(spark.sqlContext, rdd)
> df: org.apache.spark.sql.DataFrame = [Comments: struct array>>]
>
> scala>df.printSchema
> root
>  |-- Comments: struct (nullable = true)
>  ||-- Comment: array (nullable = true)
>  |||-- element: struct (containsNull = true)
>  ||||-- Description: string (nullable = true)
>  ||||-- Title: string (nullable = true)
>
> scala>df.show(false)
> +--+
> |Comments
>   |
> +--+
> |[WrappedArray([Description_1.1,Title1.1], [Description_1.2,Title1.2],
> [Description_1.3,Title1.3])]|
> +--+
>
>
> scala>df.withColumn("comment",
> explode(df("Comments.Comment"))).select($"comment.Description",
> $"comment.Title").show
> +---++
> |Description|   Title|
> +---++
> |Description_1.1|Title1.1|
> |Description_1.2|Title1.2|
> |Description_1.3|Title1.3|
> +---++
>
>
>
> 
> From: Talap, Amol 
> Sent: Thursday, June 29, 2017 9:38 AM
> To: Judit Planas; user@spark.apache.org
> Subject: RE: SparkSQL to read XML Blob data to create multiple rows
>
>
> Thanks Judit, Ayan
>
> Judit,
>
> You almost got it. The explode might help here.
>
> But when I tried I see load() doesn’t like to read from xmlcomment column on
> oracle_data.
>
>
>
> scala> val xmlDF = sqlContext.sql("SELECT * FROM oracle_data")
>
> 17/06/29 18:31:58 INFO parse.ParseDriver: Parsing command: SELECT * FROM
> oracle_data
>
> 17/06/29 18:31:58 INFO parse.ParseDriver: Parse Completed
>
> …
>
> scala> val xmlDF_flattened =
> xmlDF.withColumn("xmlcomment",explode(sqlContext.read.format("com.databricks.spark.xml").option("rowTag","book").load($"xmlcomment")))
>
> :22: error: overloaded method value load with alternatives:
>
>   ()org.apache.spark.sql.DataFrame 
>
>   (path: String)org.apache.spark.sql.DataFrame
>
> cannot be applied to (org.apache.spark.sql.ColumnName)
>
>val xmlDF_flattened =
> xmlDF.withColumn("xmlcomment",explode(sqlContext.read.format("com.databricks.spark.xml").option("rowTag","book").load($"xmlcomment")))
>
>
>
> Ayan,
>
> Output of books_inexp.show was as below
> title, author
> Midnight Rain,Ralls, Kim
> Maeve Ascendant,  Corets, Eva
>
>
>
> Regards,
>
> Amol
>
> From: Judit Planas [mailto:judit.pla...@epfl.ch]
> Sent: Thursday, June 29, 2017 3:46 AM
> To: user@spark.apache.org
> Subject: 

Glue-like Functionality

2017-07-08 Thread Benjamin Kim
Has anyone seen AWS Glue? I was wondering if there is something similar going 
to be built into Spark Structured Streaming? I like the Data Catalog idea to 
store and track any data source/destination. It profiles the data to derive the 
scheme and data types. Also, it does some sort-of automated schema evolution 
when or if the schema changes. It leaves only the transformation logic to the 
ETL developer. I think some of this can enhance or simplify Structured 
Streaming. For example, AWS S3 can be catalogued as a Data Source; in 
Structured Streaming, Input DataFrame is created like a SQL view based off of 
the S3 Data Source; lastly, the Transform logic, if any, just manipulates the 
data going from the Input DataFrame to the Result DataFrame, which is another 
view based off of a catalogued Data Destination. This would relieve the ETL 
developer from caring about any Data Source or Destination. All server 
information, access credentials, data schemas, folder directory structures, 
file formats, and any other properties can be securely stored away with only a 
select few.

I'm just curious to know if anyone has thought the same thing.

Cheers,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Event time aggregation is possible in Spark Streaming ?

2017-07-08 Thread Swapnil Chougule
Hello,

I want to know whether event time aggregation in spark streaming. I could
see it's possible in structured streaming. As I am working on conventional
spark streaming, I need event time aggregation in it. I checked but didn't
get any relevant documentation.

Thanks in advance

Regards,
Swapnil