AFAICT, we can use spark.sql(s"select $name ..."), name is a value in Scala context[1].
-- Cheers, -z [1] https://docs.scala-lang.org/overviews/core/string-interpolation.html On Fri, 17 Apr 2020 00:10:59 +0100 Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Thanks Patrick, > > The partition broadcastId is static as defined as a value below > > > val broadcastValue = "123456789" // I assume this will be sent as a > constant for the batch > > // Create a DF on top of XML > val df = spark.read. > format("com.databricks.spark.xml"). > option("rootTag", "hierarchy"). > option("rowTag", "sms_request"). > load("/tmp/broadcast.xml") > > // add this constant column to dataframe itself > val newDF = df.withColumn("broadcastId", lit(broadcastValue)) > > newDF.show(100,false) > > newDF.createOrReplaceTempView("tmp") > > // Need to create and populate target Parquet table > michtest.BroadcastStaging > // > HiveContext.sql("""DROP TABLE IF EXISTS michtest.BroadcastStaging""") > > var sqltext = """ > CREATE TABLE IF NOT EXISTS michtest.BroadcastStaging ( > partyId STRING > , phoneNumber STRING > ) > PARTITIONED BY ( > broadcastId STRING > , brand STRING > ) > STORED AS PARQUET > """ > HiveContext.sql(sqltext) > // > // Put data in Hive table > // > // Dynamic partitioning is disabled by default. We turn it on > spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict") > > // Now put static partition (broadcastId) first and dynamic partition > (brand)last > > sqltext = """ > INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId = > broadcastValue, brand) > SELECT > ocis_party_id AS partyId > , target_mobile_no AS phoneNumber > , broadcastId > , brand > FROM tmp > """ > spark.sql(sqltext) > > Still not working properly > > sqltext: String = > INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId = > broadcastValue, brand) > SELECT > ocis_party_id AS partyId > , target_mobile_no AS phoneNumber > , broadcastId > , brand > FROM tmp > > scala> spark.sql(sqltext) > org.apache.spark.sql.catalyst.parser.ParseException: > missing STRING at ','(line 2, pos 85) > > == SQL == > > INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId = > broadcastValue, brand) > -------------------------------------------------------------------------------------^^^ > SELECT > ocis_party_id AS partyId > , target_mobile_no AS phoneNumber > , broadcastId > , brand > FROM tmp > > at > org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:241) > at > org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:117) > at > org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48) > at > org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:69) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642) > ... 55 elided > > > The thing is that if I replace broadcastId = broadcastValue with broadcastId > = " 123456789" it works! > > > Thanks, > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 16 Apr 2020 at 13:25, Patrick McCarthy > <pmccar...@dstillery.com.invalid> wrote: > > > What happens if you change your insert statement to be > > > > INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId = > > broadcastValue, brand) > > > > and then add the value for brand into the select as > > SELECT > > ocis_party_id AS partyId > > , target_mobile_no AS phoneNumber > > , brand > > You may need to rearrange the order of the partitions to put dynamic > > partitions before static. > > > > On Wed, Apr 15, 2020 at 7:48 PM Mich Talebzadeh <mich.talebza...@gmail.com> > > wrote: > > > >> Hi, > >> > >> I have an XML file that is read into Spark using Databa bricks jar file > >> > >> spark-xml_2.11-0.9.0.jar > >> > >> Doing some tests > >> > >> This is the format of XML (one row here) > >> > >> //* > >> <sms_request> > >> <sms_campaign_code>SKY</sms_campaign_code> > >> <target_mobile_no>0123456789</target_mobile_no> > >> <ocis_party_id>123456789</ocis_party_id> > >> <brand>XYZ</brand> > >> <sms_template_code>GLX</sms_template_code> > >> <sms_request_external_ref>12345678</sms_request_external_ref> > >> <sms_request_external_txn_ref>55555555</sms_request_external_txn_ref> > >> <sms_template_variable> > >> </sms_template_variable> > >> </sms_request> > >> */ > >> > >> OK I am trying to insert data into a hive partitioned table through spark > >> as follows: > >> > >> import org.apache.spark.sql.DataFrame > >> import org.apache.spark.sql.functions._ > >> import java.util.Date > >> import org.apache.spark.sql.functions.col > >> import org.apache.spark.sql.SaveMode > >> > >> sc.setLogLevel("WARN") > >> import org.apache.log4j.Logger > >> import org.apache.log4j.Level > >> Logger.getLogger("org").setLevel(Level.OFF) > >> Logger.getLogger("akka").setLevel(Level.OFF) > >> > >> // xml stuff > >> import com.databricks.spark.xml.functions.from_xml > >> import com.databricks.spark.xml.schema_of_xml > >> import org.apache.spark.sql.SparkSession > >> import org.apache.spark.sql.types.{StructType, StructField, StringType, > >> DoubleType} > >> import com.databricks.spark.xml._ > >> import spark.implicits._ > >> // > >> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) > >> println ("\nStarted at"); HiveContext.sql("SELECT > >> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > >> ").collect.foreach(println) > >> > >> val broadcastValue = "123456789" > >> val df = spark.read. > >> format("com.databricks.spark.xml"). > >> option("rootTag", "hierarchy"). > >> option("rowTag", "sms_request"). > >> load("/tmp/broadcast.xml") > >> df.printSchema > >> df.show(10,false) > >> > >> df.createOrReplaceTempView("tmp") > >> // Need to create and populate target ORC table michtest.BroadcastStaging > >> // > >> HiveContext.sql("""DROP TABLE IF EXISTS michtest.BroadcastStaging""") > >> > >> var sqltext = """ > >> CREATE TABLE IF NOT EXISTS michtest.BroadcastStaging ( > >> partyId STRING > >> , phoneNumber STRING > >> ) > >> PARTITIONED BY ( > >> broadcastId STRING > >> , brand STRING) > >> STORED AS PARQUET > >> """ > >> HiveContext.sql(sqltext) > >> // > >> // Put data in Hive table > >> // > >> // Dynamic partitioning is disabled by default. We turn it on > >> spark.sql("SET hive.exec.dynamic.partition = true") > >> spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict ") > >> // spark.sql("SET hive.exec.max.dynamic.partitions.pernode = 400") > >> > >> > >> > >> > >> > >> > >> > >> > >> * sqltext = """ INSERT INTO TABLE michtest.BroadcastStaging PARTITION > >> (broadcastId = broadcastValue, brand = brand) SELECT > >> ocis_party_id AS partyId , target_mobile_no AS phoneNumber FROM > >> tmp """* > >> spark.sql(sqltext) > >> spark.sql("select * from michtest.BroadcastStaging").show(10,false) > >> > >> > >> This does not work because I need to pass onetime fixed value for > >> partition column value *broadcastId *and dynamic value for *brand* column > >> from the table itself > >> > >> > >> *This is the outcome of run* > >> > >> > >> Started at > >> [16/04/2020 00:37:34.34] > >> root > >> |-- brand: string (nullable = true) > >> |-- ocis_party_id: long (nullable = true) > >> |-- sms_campaign_code: string (nullable = true) > >> |-- sms_request_external_ref: long (nullable = true) > >> |-- sms_request_external_txn_ref: long (nullable = true) > >> |-- sms_template_code: string (nullable = true) > >> |-- sms_template_variable: string (nullable = true) > >> |-- target_mobile_no: long (nullable = true) > >> > >> > >> +-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+ > >> > >> |brand|ocis_party_id|sms_campaign_code|sms_request_external_ref|sms_request_external_txn_ref|sms_template_code|sms_template_variable|target_mobile_no| > >> > >> +-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+ > >> |XYZ |123456789 |SKY |12345678 |55555555 > >> |GLX | > >> |123456789 | > >> > >> +-----+-------------+-----------------+------------------------+----------------------------+-----------------+---------------------+----------------+ > >> > >> org.apache.spark.sql.catalyst.parser.ParseException: > >> missing STRING at ','(line 2, pos 85) > >> > >> == SQL == > >> > >> INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId = > >> broadcastValue, brand = dummy) > >> > >> -------------------------------------------------------------------------------------^^^ > >> SELECT > >> ocis_party_id AS partyId > >> , target_mobile_no AS phoneNumber > >> FROM tmp > >> > >> It fails passing partition values > >> > >> > >> Thanks, > >> > >> > >> Dr Mich Talebzadeh > >> > >> > >> > >> LinkedIn * > >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > >> > >> > >> > >> http://talebzadehmich.wordpress.com > >> > >> > >> *Disclaimer:* Use it at your own risk. Any and all responsibility for > >> any loss, damage or destruction of data or any other property which may > >> arise from relying on this email's technical content is explicitly > >> disclaimed. The author will in no case be liable for any monetary damages > >> arising from such loss, damage or destruction. > >> > >> > >> > > > > > > -- > > > > > > *Patrick McCarthy * > > > > Senior Data Scientist, Machine Learning Engineering > > > > Dstillery > > > > 470 Park Ave South, 17th Floor, NYC 10016 > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org