Hi,

I have an XML file that is read into Spark using Databa bricks jar file

spark-xml_2.11-0.9.0.jar

Doing some tests

This is the format of XML (one row here)

//*
<sms_request>
<sms_campaign_code>SKY</sms_campaign_code>
<target_mobile_no>0123456789</target_mobile_no>
<ocis_party_id>123456789</ocis_party_id>
<brand>XYZ</brand>
<sms_template_code>GLX</sms_template_code>
<sms_request_external_ref>12345678</sms_request_external_ref>
<sms_request_external_txn_ref>55555555</sms_request_external_txn_ref>
<sms_template_variable>
</sms_template_variable>
</sms_request>
*/

OK I am trying to insert data into a hive partitioned table through spark
as follows:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import java.util.Date
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.SaveMode

sc.setLogLevel("WARN")
import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

// xml stuff
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType,
DoubleType}
import com.databricks.spark.xml._
import spark.implicits._
//
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("\nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)

val broadcastValue = "123456789"
val df = spark.read.
                format("com.databricks.spark.xml").
                option("rootTag", "hierarchy").
                option("rowTag", "sms_request").
                load("/tmp/broadcast.xml")
df.printSchema
df.show(10,false)

df.createOrReplaceTempView("tmp")
// Need to create and populate target ORC table michtest.BroadcastStaging
//
HiveContext.sql("""DROP TABLE IF EXISTS michtest.BroadcastStaging""")

  var sqltext = """
  CREATE TABLE IF NOT EXISTS michtest.BroadcastStaging (
     partyId STRING
   , phoneNumber STRING
  )
  PARTITIONED BY (
     broadcastId STRING
   , brand STRING)
  STORED AS PARQUET
  """
  HiveContext.sql(sqltext)
  //
  // Put data in Hive table
  //
     // Dynamic partitioning is disabled by default. We turn it on
     spark.sql("SET hive.exec.dynamic.partition = true")
     spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
     // spark.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")








* sqltext = """  INSERT INTO TABLE michtest.BroadcastStaging PARTITION
(broadcastId = broadcastValue, brand = brand)  SELECT
ocis_party_id AS partyId        , target_mobile_no AS phoneNumber  FROM
tmp  """*
  spark.sql(sqltext)
  spark.sql("select * from michtest.BroadcastStaging").show(10,false)


This does not work because I need to pass onetime fixed value for partition
column value *broadcastId *and dynamic value for *brand* column from the
table itself


*This is the outcome of run*


Started at
[16/04/2020 00:37:34.34]
root
 |-- brand: string (nullable = true)
 |-- ocis_party_id: long (nullable = true)
 |-- sms_campaign_code: string (nullable = true)
 |-- sms_request_external_ref: long (nullable = true)
 |-- sms_request_external_txn_ref: long (nullable = true)
 |-- sms_template_code: string (nullable = true)
 |-- sms_template_variable: string (nullable = true)
 |-- target_mobile_no: long (nullable = true)

+-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+
|brand|ocis_party_id|sms_campaign_code|sms_request_external_ref|sms_request_external_txn_ref|sms_template_code|sms_template_variable|target_mobile_no|
+-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+
|XYZ  |123456789    |SKY              |12345678                |55555555
                 |GLX              |
                    |123456789       |
+-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+

org.apache.spark.sql.catalyst.parser.ParseException:
missing STRING at ','(line 2, pos 85)

== SQL ==

  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
broadcastValue, brand = dummy)
-------------------------------------------------------------------------------------^^^
  SELECT
          ocis_party_id AS partyId
        , target_mobile_no AS phoneNumber
  FROM tmp

It fails passing partition values


Thanks,


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Reply via email to