Hi All,
I am trying to build a spark application which will read the data from
Postgresql (source) one environment and write it to postgreSQL, Aurora
(target) on a dfiffernt environment (like to PROD to QA or QA to PROD etc)
using spark JDBC.
When I am loading the dataframe back to target DB, I would like to ensure the
same schema as the source table schema using
val targetTableSchema: String =
"""
| operating_unit_nm character varying(20),
| organization_id integer,
| organization_cd character varying(30),
| requesting_organization_id integer,
| requesting_organization_cd character varying(50),
| owning_organization_id integer,
| owning_organization_cd character varying(50)
""".stripMargin
.option("createTableColumnTypes", targetTableSchema )
I would like to know if there is way I can create this targetTableSchema
(source table DDL) variable directly from source table or from a csv file. I
don’t want spark to enforce its default schema. Based on the table name, How
do I get the DDL created dynamically to pass it to targetTableSchema variable
as a string.
Currently I am updating targetTableSchema manually and looking for some
pointer to automate it.
Below is my code
// Define the parameter
val sourceDb: String = args(0)
val targetDb: String = args(1)
val sourceTable: String = args(2)
val targetTable: String = args(3)
val sourceEnv: String = args(4)
val targetEnv: String = args(5)
println("Arguments Provided: " + sourceDb, targetDb,sourceTable, targetTable,
sourceEnv, targetEnv)
// Define the spark session
val spark: SparkSession = SparkSession
.builder()
.appName("Ca-Data-Transporter")
.master("local")
.config("driver", "org.postgresql.Driver")
.getOrCreate()
// define the input directory
val inputDir: String =
"/Users/gangadharkadam/projects/ca-spark-apps/src/main/resources/"
// Define the source DB properties
val sourceParmFile: String = if (sourceDb == "RDS") {
"rds-db-parms-" + sourceEnv + ".txt"
}
else if (sourceDb == "AURORA") {
"aws-db-parms-" + sourceEnv + ".txt"
}
else if (sourceDb == "GP") {
"gp-db-parms-" + sourceEnv + ".txt"
}
else "NA"
println(sourceParmFile)
val sourceDbParms: Properties = new Properties()
sourceDbParms.load(new FileInputStream(new File(inputDir + sourceParmFile)))
val sourceDbJdbcUrl: String = sourceDbParms.getProperty("jdbcUrl")
println(s"$sourceDb")
println(s"$sourceDbJdbcUrl")
// Define the target DB properties
val targetParmFile: String = if (targetDb == "RDS") {
s"rds-db-parms-" + targetEnv + ".txt"
}
else if (targetDb == "AURORA") {
s"aws-db-parms-" + targetEnv + ".txt"
}
else if (targetDb == "GP") {
s"gp-db-parms-" + targetEnv + ".txt"
} else "aws-db-parms-$targetEnv.txt"
println(targetParmFile)
val targetDbParms: Properties = new Properties()
targetDbParms.load(new FileInputStream(new File(inputDir + targetParmFile)))
val targetDbJdbcUrl: String = targetDbParms.getProperty("jdbcUrl")
println(s"$targetDb")
println(s"$targetDbJdbcUrl")
// Read the source table as dataFrame
val sourceDF: DataFrame = spark
.read
.jdbc(url = sourceDbJdbcUrl,
table = sourceTable,
sourceDbParms
)
//.filter("site_code is not null")
sourceDF.printSchema()
sourceDF.show()
val sourceDF1 = sourceDF.repartition(
sourceDF("organization_id")
//sourceDF("plan_id")
)
val targetTableSchema: String =
"""
| operating_unit_nm character varying(20),
| organization_id integer,
| organization_cd character varying(30),
| requesting_organization_id integer,
| requesting_organization_cd character varying(50),
| owning_organization_id integer,
| owning_organization_cd character varying(50)
""".stripMargin
// write the dataFrame
sourceDF1
.write
.option("createTableColumnTypes", targetTableSchema )
.mode(saveMode = "Overwrite")
.option("truncate", "true")
.jdbc(targetDbJdbcUrl, targetTable, targetDbParms)
Thanks!
Gangadhar Kadam
Sr. Data Engineer
M + 1 (401) 588 2269
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]