AFAICT, we can use spark.sql(s"select $name ..."), name is a value in
Scala context[1].

-- 
Cheers,
-z

[1] https://docs.scala-lang.org/overviews/core/string-interpolation.html

On Fri, 17 Apr 2020 00:10:59 +0100
Mich Talebzadeh <mich.talebza...@gmail.com> wrote:

> Thanks Patrick,
> 
> The partition  broadcastId is static as defined as a value below
> 
> 
> val broadcastValue = "123456789"  // I assume this will be sent as a
> constant for the batch
> 
> // Create a DF on top of XML
> val df = spark.read.
>                 format("com.databricks.spark.xml").
>                 option("rootTag", "hierarchy").
>                 option("rowTag", "sms_request").
>                 load("/tmp/broadcast.xml")
> 
> // add this constant column to dataframe itself
> val newDF = df.withColumn("broadcastId", lit(broadcastValue))
> 
> newDF.show(100,false)
> 
> newDF.createOrReplaceTempView("tmp")
> 
> // Need to create and populate target Parquet table
> michtest.BroadcastStaging
> //
> HiveContext.sql("""DROP TABLE IF EXISTS michtest.BroadcastStaging""")
> 
>   var sqltext = """
>   CREATE TABLE IF NOT EXISTS michtest.BroadcastStaging (
>      partyId STRING
>    , phoneNumber STRING
>   )
>   PARTITIONED BY (
>      broadcastId STRING
>    , brand STRING
> )
>   STORED AS PARQUET
>   """
>   HiveContext.sql(sqltext)
>   //
>   // Put data in Hive table
>   //
>      // Dynamic partitioning is disabled by default. We turn it on
>      spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict")
> 
> // Now put static partition (broadcastId) first and dynamic partition
> (brand)last
> 
>   sqltext = """
>   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> broadcastValue, brand)
>   SELECT
>           ocis_party_id AS partyId
>         , target_mobile_no AS phoneNumber
>         , broadcastId
>         , brand
>   FROM tmp
>   """
>   spark.sql(sqltext)
> 
> Still not working properly
> 
> sqltext: String =
>   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> broadcastValue, brand)
>   SELECT
>           ocis_party_id AS partyId
>         , target_mobile_no AS phoneNumber
>         , broadcastId
>         , brand
>   FROM tmp
> 
> scala>   spark.sql(sqltext)
> org.apache.spark.sql.catalyst.parser.ParseException:
> missing STRING at ','(line 2, pos 85)
> 
> == SQL ==
> 
>   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> broadcastValue, brand)
> -------------------------------------------------------------------------------------^^^
>   SELECT
>           ocis_party_id AS partyId
>         , target_mobile_no AS phoneNumber
>         , broadcastId
>         , brand
>   FROM tmp
> 
>   at
> org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:241)
>   at
> org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:117)
>   at
> org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)
>   at
> org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:69)
>   at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
>   ... 55 elided
> 
> 
> The thing is that if I replace broadcastId = broadcastValue with  broadcastId
> = " 123456789" it works!
> 
> 
> Thanks,
> 
> 
> Dr Mich Talebzadeh
> 
> 
> 
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
> 
> 
> 
> http://talebzadehmich.wordpress.com
> 
> 
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
> 
> 
> 
> 
> On Thu, 16 Apr 2020 at 13:25, Patrick McCarthy
> <pmccar...@dstillery.com.invalid> wrote:
> 
> > What happens if you change your insert statement to be
> >
> >   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> > broadcastValue, brand)
> >
> > and then add the value for brand into the select as
> >   SELECT
> >           ocis_party_id AS partyId
> >         , target_mobile_no AS phoneNumber
> >         , brand
> > You may need to rearrange the order of the partitions to put dynamic
> > partitions before static.
> >
> > On Wed, Apr 15, 2020 at 7:48 PM Mich Talebzadeh <mich.talebza...@gmail.com>
> > wrote:
> >
> >> Hi,
> >>
> >> I have an XML file that is read into Spark using Databa bricks jar file
> >>
> >> spark-xml_2.11-0.9.0.jar
> >>
> >> Doing some tests
> >>
> >> This is the format of XML (one row here)
> >>
> >> //*
> >> <sms_request>
> >> <sms_campaign_code>SKY</sms_campaign_code>
> >> <target_mobile_no>0123456789</target_mobile_no>
> >> <ocis_party_id>123456789</ocis_party_id>
> >> <brand>XYZ</brand>
> >> <sms_template_code>GLX</sms_template_code>
> >> <sms_request_external_ref>12345678</sms_request_external_ref>
> >> <sms_request_external_txn_ref>55555555</sms_request_external_txn_ref>
> >> <sms_template_variable>
> >> </sms_template_variable>
> >> </sms_request>
> >> */
> >>
> >> OK I am trying to insert data into a hive partitioned table through spark
> >> as follows:
> >>
> >> import org.apache.spark.sql.DataFrame
> >> import org.apache.spark.sql.functions._
> >> import java.util.Date
> >> import org.apache.spark.sql.functions.col
> >> import org.apache.spark.sql.SaveMode
> >>
> >> sc.setLogLevel("WARN")
> >> import org.apache.log4j.Logger
> >> import org.apache.log4j.Level
> >> Logger.getLogger("org").setLevel(Level.OFF)
> >> Logger.getLogger("akka").setLevel(Level.OFF)
> >>
> >> // xml stuff
> >> import com.databricks.spark.xml.functions.from_xml
> >> import com.databricks.spark.xml.schema_of_xml
> >> import org.apache.spark.sql.SparkSession
> >> import org.apache.spark.sql.types.{StructType, StructField, StringType,
> >> DoubleType}
> >> import com.databricks.spark.xml._
> >> import spark.implicits._
> >> //
> >> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> >> println ("\nStarted at"); HiveContext.sql("SELECT
> >> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> >> ").collect.foreach(println)
> >>
> >> val broadcastValue = "123456789"
> >> val df = spark.read.
> >>                 format("com.databricks.spark.xml").
> >>                 option("rootTag", "hierarchy").
> >>                 option("rowTag", "sms_request").
> >>                 load("/tmp/broadcast.xml")
> >> df.printSchema
> >> df.show(10,false)
> >>
> >> df.createOrReplaceTempView("tmp")
> >> // Need to create and populate target ORC table michtest.BroadcastStaging
> >> //
> >> HiveContext.sql("""DROP TABLE IF EXISTS michtest.BroadcastStaging""")
> >>
> >>   var sqltext = """
> >>   CREATE TABLE IF NOT EXISTS michtest.BroadcastStaging (
> >>      partyId STRING
> >>    , phoneNumber STRING
> >>   )
> >>   PARTITIONED BY (
> >>      broadcastId STRING
> >>    , brand STRING)
> >>   STORED AS PARQUET
> >>   """
> >>   HiveContext.sql(sqltext)
> >>   //
> >>   // Put data in Hive table
> >>   //
> >>      // Dynamic partitioning is disabled by default. We turn it on
> >>      spark.sql("SET hive.exec.dynamic.partition = true")
> >>      spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
> >>      // spark.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> * sqltext = """  INSERT INTO TABLE michtest.BroadcastStaging PARTITION
> >> (broadcastId = broadcastValue, brand = brand)  SELECT
> >> ocis_party_id AS partyId        , target_mobile_no AS phoneNumber  FROM
> >> tmp  """*
> >>   spark.sql(sqltext)
> >>   spark.sql("select * from michtest.BroadcastStaging").show(10,false)
> >>
> >>
> >> This does not work because I need to pass onetime fixed value for
> >> partition column value *broadcastId *and dynamic value for *brand* column
> >> from the table itself
> >>
> >>
> >> *This is the outcome of run*
> >>
> >>
> >> Started at
> >> [16/04/2020 00:37:34.34]
> >> root
> >>  |-- brand: string (nullable = true)
> >>  |-- ocis_party_id: long (nullable = true)
> >>  |-- sms_campaign_code: string (nullable = true)
> >>  |-- sms_request_external_ref: long (nullable = true)
> >>  |-- sms_request_external_txn_ref: long (nullable = true)
> >>  |-- sms_template_code: string (nullable = true)
> >>  |-- sms_template_variable: string (nullable = true)
> >>  |-- target_mobile_no: long (nullable = true)
> >>
> >>
> >> +-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+
> >>
> >> |brand|ocis_party_id|sms_campaign_code|sms_request_external_ref|sms_request_external_txn_ref|sms_template_code|sms_template_variable|target_mobile_no|
> >>
> >> +-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+
> >> |XYZ  |123456789    |SKY              |12345678                |55555555
> >>                    |GLX              |
> >>                     |123456789       |
> >>
> >> +-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+
> >>
> >> org.apache.spark.sql.catalyst.parser.ParseException:
> >> missing STRING at ','(line 2, pos 85)
> >>
> >> == SQL ==
> >>
> >>   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> >> broadcastValue, brand = dummy)
> >>
> >> -------------------------------------------------------------------------------------^^^
> >>   SELECT
> >>           ocis_party_id AS partyId
> >>         , target_mobile_no AS phoneNumber
> >>   FROM tmp
> >>
> >> It fails passing partition values
> >>
> >>
> >> Thanks,
> >>
> >>
> >> Dr Mich Talebzadeh
> >>
> >>
> >>
> >> LinkedIn * 
> >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
> >>
> >>
> >>
> >> http://talebzadehmich.wordpress.com
> >>
> >>
> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> >> any loss, damage or destruction of data or any other property which may
> >> arise from relying on this email's technical content is explicitly
> >> disclaimed. The author will in no case be liable for any monetary damages
> >> arising from such loss, damage or destruction.
> >>
> >>
> >>
> >
> >
> > --
> >
> >
> > *Patrick McCarthy  *
> >
> > Senior Data Scientist, Machine Learning Engineering
> >
> > Dstillery
> >
> > 470 Park Ave South, 17th Floor, NYC 10016
> >

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to