Hi All,

I am trying to build a spark application which will  read the data from 
Postgresql (source)  one environment  and write it to  postgreSQL, Aurora 
(target)  on a dfiffernt environment  (like to PROD to QA or QA to PROD etc) 
using spark JDBC.

When I am loading the dataframe back to target DB, I would like to ensure the 
same schema as the source table schema using

val targetTableSchema: String =
  """
    |  operating_unit_nm character varying(20),
    |  organization_id integer,
    |  organization_cd character varying(30),
    |  requesting_organization_id integer,
    |  requesting_organization_cd character varying(50),
    |  owning_organization_id integer,
    |  owning_organization_cd character varying(50)
""".stripMargin


.option("createTableColumnTypes", targetTableSchema )

I would like to know if there is way I can create this targetTableSchema 
(source table DDL) variable directly from source table or from a csv file. I 
don’t want spark to enforce its default schema.  Based on the table name, How 
do I  get the DDL created dynamically to pass it to targetTableSchema variable 
as a string.

Currently I am updating targetTableSchema manually  and looking for some 
pointer to automate it.


Below is my code

// Define the parameter
val sourceDb: String = args(0)
val targetDb: String = args(1)
val sourceTable: String = args(2)
val targetTable: String = args(3)
val sourceEnv: String = args(4)
val targetEnv: String = args(5)

println("Arguments Provided: " + sourceDb, targetDb,sourceTable, targetTable, 
sourceEnv, targetEnv)

// Define the spark session
val spark: SparkSession = SparkSession
  .builder()
  .appName("Ca-Data-Transporter")
  .master("local")
  .config("driver", "org.postgresql.Driver")
  .getOrCreate()

// define the input directory
val inputDir: String = 
"/Users/gangadharkadam/projects/ca-spark-apps/src/main/resources/"

// Define the source DB properties
val sourceParmFile: String = if (sourceDb == "RDS") {
    "rds-db-parms-" + sourceEnv + ".txt"
  }
  else if (sourceDb == "AURORA") {
    "aws-db-parms-" + sourceEnv + ".txt"
  }
  else if (sourceDb == "GP") {
    "gp-db-parms-" + sourceEnv + ".txt"
  }
  else "NA"

println(sourceParmFile)

val sourceDbParms: Properties = new Properties()
sourceDbParms.load(new FileInputStream(new File(inputDir + sourceParmFile)))
val sourceDbJdbcUrl: String = sourceDbParms.getProperty("jdbcUrl")

println(s"$sourceDb")
println(s"$sourceDbJdbcUrl")

// Define the target DB properties
val targetParmFile: String = if (targetDb == "RDS") {
    s"rds-db-parms-" + targetEnv + ".txt"
  }
  else if (targetDb == "AURORA") {
    s"aws-db-parms-" + targetEnv + ".txt"
  }
  else if (targetDb == "GP") {
    s"gp-db-parms-" + targetEnv + ".txt"
  } else "aws-db-parms-$targetEnv.txt"

println(targetParmFile)

val targetDbParms: Properties = new Properties()
targetDbParms.load(new FileInputStream(new File(inputDir + targetParmFile)))
val targetDbJdbcUrl: String = targetDbParms.getProperty("jdbcUrl")

println(s"$targetDb")
println(s"$targetDbJdbcUrl")

// Read the source table as dataFrame
val sourceDF: DataFrame = spark
  .read
  .jdbc(url = sourceDbJdbcUrl,
    table = sourceTable,
    sourceDbParms
  )
  //.filter("site_code is not null")

sourceDF.printSchema()
sourceDF.show()

val sourceDF1 = sourceDF.repartition(
  sourceDF("organization_id")
  //sourceDF("plan_id")
)


val targetTableSchema: String =
  """
    |  operating_unit_nm character varying(20),
    |  organization_id integer,
    |  organization_cd character varying(30),
    |  requesting_organization_id integer,
    |  requesting_organization_cd character varying(50),
    |  owning_organization_id integer,
    |  owning_organization_cd character varying(50)
  """.stripMargin


// write the dataFrame
sourceDF1
  .write
  .option("createTableColumnTypes", targetTableSchema )
  .mode(saveMode = "Overwrite")
  .option("truncate", "true")
  .jdbc(targetDbJdbcUrl, targetTable, targetDbParms)


Thanks!
Gangadhar Kadam
Sr. Data Engineer
M + 1 (401) 588 2269

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to