I dug some of my old stuff using Spark as ETL. Regarding the question
"Any reason why Spark's SaveMode doesn't have mode that ignore any Primary Key/Unique constraint violations?" There is no way Spark can determine if PK constraint is violated until it receives such message from Oracle through JDBC connection. In general SaveMode is implanted as follows: import org.apache.spark.sql.SaveMode val saveMode = SaveMode.Append Only SaveMode Overwrite or Append seem to work. The other mode like Ignore etc do not work. However, one can exclude records that already exist in Oracle through reading the Oracle table in like my previous mail and excluding records that already have PK in Oracle. This can be done through SQL itself by creating tempView on top of your Oracle DF and Cassandra DF). Again ID is PK constraint on the Oracle table // find out IDs that do not exist (i.e. new records). FYI, dfdummy2 is your Cassandra DF and s is your Oracle DF dfdummy2.createOrReplaceTempView("dfdummy2") s.createOrReplaceTempView("s") //Create an Outer join between two DFs in SQL var sqltext = """select dfdummy2.ID, CLUSTERED, SCATTERED, RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING FROM dfdummy2 LEFT OUTER JOIN s ON dfdummy2.ID = s.ID WHERE s.ID IS NULL ORDER BY dfdummy2.ID""" sql(sqltext).count() // write the RS to Oracle table // Put new data into Oracle table val connectionProperties = new Properties connectionProperties.put("user", _username) connectionProperties.put("password", _password) connectionProperties.put("jdbUrl", _ORACLEserver) connectionProperties.put("jdbcDriver", driverName) //broadcast jdbc connection parameters to cluster nodes val brConnect = sc.broadcast(connectionProperties) val saveMode = SaveMode.Append sql(sqltext).write.mode(saveMode).jdbc(_ORACLEserver, _dbschema+"."+_dbtable, connectionProperties) HTH Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sat, 20 Jul 2019 at 08:13, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > JDBC read from Oracle table requires Oracle jdbc driver ojdbc6.jar or > higher. ojdbc6.jar works for 11 and 12c added as --jars <PATH>/ojdbc6.jar > > Example with parallel read (4 connections) to Oracle with ID being your > PK in Oracle table > > var _ORACLEserver= "jdbc:oracle:thin:@rhes564:1521:mydb12" > var _username = "scratchpad" > var _password = "xxxx" > // > // > // Get minID and maxID first > // > val minID = HiveContext.read.format("jdbc").options(Map("url" -> > _ORACLEserver,"dbtable" -> "(SELECT cast(MIN(ID) AS INT) AS maxID FROM > scratchpad.dummy)", > "user" -> _username, "password" -> > _password)).load().collect.apply(0).getDecimal(0).toString > val maxID = HiveContext.read.format("jdbc").options(Map("url" -> > _ORACLEserver,"dbtable" -> "(SELECT cast(MAX(ID) AS INT) AS maxID FROM > scratchpad.dummy)", > "user" -> _username, "password" -> > _password)).load().collect.apply(0).getDecimal(0).toString > val s = HiveContext.read.format("jdbc").options( > Map("url" -> _ORACLEserver, > "dbtable" -> "(SELECT ID, CLUSTERED, SCATTERED, RANDOMISED, > RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)", > "partitionColumn" -> "ID", > "lowerBound" -> minID, > "upperBound" -> maxID, > "numPartitions" -> "4", > "user" -> _username, > "password" -> _password)).load > > HTH > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sat, 20 Jul 2019 at 07:42, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> This behaviour is governed by the underlying RDBMS for bulk insert, where >> it either commits or roll backs. >> >> You can insert new rows into an staging table in Oracle (which is common >> in ETL) and then insert/select into Oracle table in shell routine. >> >> The other way is to use JDBC in Spark to read Oracle table into a DF and >> do a result set with Oracle DF and your DF and insert only those records >> into Oracle. >> >> HTH >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Sat, 20 Jul 2019 at 05:35, Richard <fifistorm...@gmail.com> wrote: >> >>> Any reason why Spark's SaveMode doesn't have mode that ignore any >>> Primary Key/Unique constraint violations? >>> >>> Let's say I'm using spark to migrate some data from Cassandra to Oracle, >>> I want the insert operation to be "ignore if exist primary keys" instead of >>> failing the whole batch. >>> >>> Thanks, >>> Richard >>> >>>