Hi All, I am trying to build a spark application which will read the data from Postgresql (source) one environment and write it to postgreSQL, Aurora (target) on a dfiffernt environment (like to PROD to QA or QA to PROD etc) using spark JDBC.
When I am loading the dataframe back to target DB, I would like to ensure the same schema as the source table schema using val targetTableSchema: String = """ | operating_unit_nm character varying(20), | organization_id integer, | organization_cd character varying(30), | requesting_organization_id integer, | requesting_organization_cd character varying(50), | owning_organization_id integer, | owning_organization_cd character varying(50) """.stripMargin .option("createTableColumnTypes", targetTableSchema ) I would like to know if there is way I can create this targetTableSchema (source table DDL) variable directly from source table or from a csv file. I don’t want spark to enforce its default schema. Based on the table name, How do I get the DDL created dynamically to pass it to targetTableSchema variable as a string. Currently I am updating targetTableSchema manually and looking for some pointer to automate it. Below is my code // Define the parameter val sourceDb: String = args(0) val targetDb: String = args(1) val sourceTable: String = args(2) val targetTable: String = args(3) val sourceEnv: String = args(4) val targetEnv: String = args(5) println("Arguments Provided: " + sourceDb, targetDb,sourceTable, targetTable, sourceEnv, targetEnv) // Define the spark session val spark: SparkSession = SparkSession .builder() .appName("Ca-Data-Transporter") .master("local") .config("driver", "org.postgresql.Driver") .getOrCreate() // define the input directory val inputDir: String = "/Users/gangadharkadam/projects/ca-spark-apps/src/main/resources/" // Define the source DB properties val sourceParmFile: String = if (sourceDb == "RDS") { "rds-db-parms-" + sourceEnv + ".txt" } else if (sourceDb == "AURORA") { "aws-db-parms-" + sourceEnv + ".txt" } else if (sourceDb == "GP") { "gp-db-parms-" + sourceEnv + ".txt" } else "NA" println(sourceParmFile) val sourceDbParms: Properties = new Properties() sourceDbParms.load(new FileInputStream(new File(inputDir + sourceParmFile))) val sourceDbJdbcUrl: String = sourceDbParms.getProperty("jdbcUrl") println(s"$sourceDb") println(s"$sourceDbJdbcUrl") // Define the target DB properties val targetParmFile: String = if (targetDb == "RDS") { s"rds-db-parms-" + targetEnv + ".txt" } else if (targetDb == "AURORA") { s"aws-db-parms-" + targetEnv + ".txt" } else if (targetDb == "GP") { s"gp-db-parms-" + targetEnv + ".txt" } else "aws-db-parms-$targetEnv.txt" println(targetParmFile) val targetDbParms: Properties = new Properties() targetDbParms.load(new FileInputStream(new File(inputDir + targetParmFile))) val targetDbJdbcUrl: String = targetDbParms.getProperty("jdbcUrl") println(s"$targetDb") println(s"$targetDbJdbcUrl") // Read the source table as dataFrame val sourceDF: DataFrame = spark .read .jdbc(url = sourceDbJdbcUrl, table = sourceTable, sourceDbParms ) //.filter("site_code is not null") sourceDF.printSchema() sourceDF.show() val sourceDF1 = sourceDF.repartition( sourceDF("organization_id") //sourceDF("plan_id") ) val targetTableSchema: String = """ | operating_unit_nm character varying(20), | organization_id integer, | organization_cd character varying(30), | requesting_organization_id integer, | requesting_organization_cd character varying(50), | owning_organization_id integer, | owning_organization_cd character varying(50) """.stripMargin // write the dataFrame sourceDF1 .write .option("createTableColumnTypes", targetTableSchema ) .mode(saveMode = "Overwrite") .option("truncate", "true") .jdbc(targetDbJdbcUrl, targetTable, targetDbParms) Thanks! Gangadhar Kadam Sr. Data Engineer M + 1 (401) 588 2269 --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org