One option is to use plain JDBC to interrogate Postgresql catalog for the 
source table and generate the DDL to create the destination table.
Then using plain JDBC again, create the table at the destination.

See the link below for some pointers…..

https://stackoverflow.com/questions/2593803/how-to-generate-the-create-table-sql-statement-for-an-existing-table-in-postgr


On 7/11/18, 9:55 PM, "Kadam, Gangadhar (GE Aviation, Non-GE)" 
<gangadhar.ka...@ge.com> wrote:

    Hi All,
    
    I am trying to build a spark application which will  read the data from 
Postgresql (source)  one environment  and write it to  postgreSQL, Aurora 
(target)  on a dfiffernt environment  (like to PROD to QA or QA to PROD etc) 
using spark JDBC.
    
    When I am loading the dataframe back to target DB, I would like to ensure 
the same schema as the source table schema using
    
    val targetTableSchema: String =
      """
        |  operating_unit_nm character varying(20),
        |  organization_id integer,
        |  organization_cd character varying(30),
        |  requesting_organization_id integer,
        |  requesting_organization_cd character varying(50),
        |  owning_organization_id integer,
        |  owning_organization_cd character varying(50)
    """.stripMargin
    
    
    .option("createTableColumnTypes", targetTableSchema )
    
    I would like to know if there is way I can create this targetTableSchema 
(source table DDL) variable directly from source table or from a csv file. I 
don’t want spark to enforce its default schema.  Based on the table name, How 
do I  get the DDL created dynamically to pass it to targetTableSchema variable 
as a string.
    
    Currently I am updating targetTableSchema manually  and looking for some 
pointer to automate it.
    
    
    Below is my code
    
    // Define the parameter
    val sourceDb: String = args(0)
    val targetDb: String = args(1)
    val sourceTable: String = args(2)
    val targetTable: String = args(3)
    val sourceEnv: String = args(4)
    val targetEnv: String = args(5)
    
    println("Arguments Provided: " + sourceDb, targetDb,sourceTable, 
targetTable, sourceEnv, targetEnv)
    
    // Define the spark session
    val spark: SparkSession = SparkSession
      .builder()
      .appName("Ca-Data-Transporter")
      .master("local")
      .config("driver", "org.postgresql.Driver")
      .getOrCreate()
    
    // define the input directory
    val inputDir: String = 
"/Users/gangadharkadam/projects/ca-spark-apps/src/main/resources/"
    
    // Define the source DB properties
    val sourceParmFile: String = if (sourceDb == "RDS") {
        "rds-db-parms-" + sourceEnv + ".txt"
      }
      else if (sourceDb == "AURORA") {
        "aws-db-parms-" + sourceEnv + ".txt"
      }
      else if (sourceDb == "GP") {
        "gp-db-parms-" + sourceEnv + ".txt"
      }
      else "NA"
    
    println(sourceParmFile)
    
    val sourceDbParms: Properties = new Properties()
    sourceDbParms.load(new FileInputStream(new File(inputDir + sourceParmFile)))
    val sourceDbJdbcUrl: String = sourceDbParms.getProperty("jdbcUrl")
    
    println(s"$sourceDb")
    println(s"$sourceDbJdbcUrl")
    
    // Define the target DB properties
    val targetParmFile: String = if (targetDb == "RDS") {
        s"rds-db-parms-" + targetEnv + ".txt"
      }
      else if (targetDb == "AURORA") {
        s"aws-db-parms-" + targetEnv + ".txt"
      }
      else if (targetDb == "GP") {
        s"gp-db-parms-" + targetEnv + ".txt"
      } else "aws-db-parms-$targetEnv.txt"
    
    println(targetParmFile)
    
    val targetDbParms: Properties = new Properties()
    targetDbParms.load(new FileInputStream(new File(inputDir + targetParmFile)))
    val targetDbJdbcUrl: String = targetDbParms.getProperty("jdbcUrl")
    
    println(s"$targetDb")
    println(s"$targetDbJdbcUrl")
    
    // Read the source table as dataFrame
    val sourceDF: DataFrame = spark
      .read
      .jdbc(url = sourceDbJdbcUrl,
        table = sourceTable,
        sourceDbParms
      )
      //.filter("site_code is not null")
    
    sourceDF.printSchema()
    sourceDF.show()
    
    val sourceDF1 = sourceDF.repartition(
      sourceDF("organization_id")
      //sourceDF("plan_id")
    )
    
    
    val targetTableSchema: String =
      """
        |  operating_unit_nm character varying(20),
        |  organization_id integer,
        |  organization_cd character varying(30),
        |  requesting_organization_id integer,
        |  requesting_organization_cd character varying(50),
        |  owning_organization_id integer,
        |  owning_organization_cd character varying(50)
      """.stripMargin
    
    
    // write the dataFrame
    sourceDF1
      .write
      .option("createTableColumnTypes", targetTableSchema )
      .mode(saveMode = "Overwrite")
      .option("truncate", "true")
      .jdbc(targetDbJdbcUrl, targetTable, targetDbParms)
    
    
    Thanks!
    Gangadhar Kadam
    Sr. Data Engineer
    M + 1 (401) 588 2269
    


---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to