Hi All,

I am trying to build a spark application which will  read the data from 
Postgresql (source)  one environment  and write it to  postgreSQL, Aurora 
(target)  on a dfiffernt environment  (like to PROD to QA or QA to PROD etc) 
using spark JDBC.

When I am loading the dataframe back to target DB, I would like to ensure the 
same schema as the source table schema using

val targetTableSchema: String =
    |  operating_unit_nm character varying(20),
    |  organization_id integer,
    |  organization_cd character varying(30),
    |  requesting_organization_id integer,
    |  requesting_organization_cd character varying(50),
    |  owning_organization_id integer,
    |  owning_organization_cd character varying(50)

.option("createTableColumnTypes", targetTableSchema )

I would like to know if there is way I can create this targetTableSchema 
(source table DDL) variable directly from source table or from a csv file. I 
don’t want spark to enforce its default schema.  Based on the table name, How 
do I  get the DDL created dynamically to pass it to targetTableSchema variable 
as a string.

Currently I am updating targetTableSchema manually  and looking for some 
pointer to automate it.

Below is my code

// Define the parameter
val sourceDb: String = args(0)
val targetDb: String = args(1)
val sourceTable: String = args(2)
val targetTable: String = args(3)
val sourceEnv: String = args(4)
val targetEnv: String = args(5)

println("Arguments Provided: " + sourceDb, targetDb,sourceTable, targetTable, 
sourceEnv, targetEnv)

// Define the spark session
val spark: SparkSession = SparkSession
  .config("driver", "org.postgresql.Driver")

// define the input directory
val inputDir: String = 

// Define the source DB properties
val sourceParmFile: String = if (sourceDb == "RDS") {
    "rds-db-parms-" + sourceEnv + ".txt"
  else if (sourceDb == "AURORA") {
    "aws-db-parms-" + sourceEnv + ".txt"
  else if (sourceDb == "GP") {
    "gp-db-parms-" + sourceEnv + ".txt"
  else "NA"


val sourceDbParms: Properties = new Properties()
sourceDbParms.load(new FileInputStream(new File(inputDir + sourceParmFile)))
val sourceDbJdbcUrl: String = sourceDbParms.getProperty("jdbcUrl")


// Define the target DB properties
val targetParmFile: String = if (targetDb == "RDS") {
    s"rds-db-parms-" + targetEnv + ".txt"
  else if (targetDb == "AURORA") {
    s"aws-db-parms-" + targetEnv + ".txt"
  else if (targetDb == "GP") {
    s"gp-db-parms-" + targetEnv + ".txt"
  } else "aws-db-parms-$targetEnv.txt"


val targetDbParms: Properties = new Properties()
targetDbParms.load(new FileInputStream(new File(inputDir + targetParmFile)))
val targetDbJdbcUrl: String = targetDbParms.getProperty("jdbcUrl")


// Read the source table as dataFrame
val sourceDF: DataFrame = spark
  .jdbc(url = sourceDbJdbcUrl,
    table = sourceTable,
  //.filter("site_code is not null")


val sourceDF1 = sourceDF.repartition(

val targetTableSchema: String =
    |  operating_unit_nm character varying(20),
    |  organization_id integer,
    |  organization_cd character varying(30),
    |  requesting_organization_id integer,
    |  requesting_organization_cd character varying(50),
    |  owning_organization_id integer,
    |  owning_organization_cd character varying(50)

// write the dataFrame
  .option("createTableColumnTypes", targetTableSchema )
  .mode(saveMode = "Overwrite")
  .option("truncate", "true")
  .jdbc(targetDbJdbcUrl, targetTable, targetDbParms)

Gangadhar Kadam
Sr. Data Engineer
M + 1 (401) 588 2269

To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to