I dug some of my old stuff using Spark as ETL.

Regarding the question

"Any reason why Spark's SaveMode doesn't have mode that ignore any Primary
Key/Unique constraint violations?"

There is no way Spark can determine if PK constraint is violated until it
receives such message from Oracle through JDBC connection. In general
SaveMode is implanted as follows:

import org.apache.spark.sql.SaveMode

val saveMode = SaveMode.Append

Only SaveMode Overwrite or Append seem to work. The other mode like Ignore
etc do not work.

However, one can exclude records that already exist in Oracle through
reading the Oracle table in like my previous mail and excluding records
that already have PK in Oracle. This can be done through SQL itself by
creating tempView on top of your Oracle DF and Cassandra DF). Again ID is
PK constraint on the Oracle table

// find out IDs that do not exist (i.e. new records). FYI, dfdummy2 is your
Cassandra DF and s is your Oracle DF

dfdummy2.createOrReplaceTempView("dfdummy2")
s.createOrReplaceTempView("s")
//Create an Outer join between two DFs in SQL
var sqltext = """select dfdummy2.ID, CLUSTERED, SCATTERED, RANDOMISED,
RANDOM_STRING, SMALL_VC, PADDING FROM dfdummy2 LEFT OUTER JOIN s ON
dfdummy2.ID = s.ID WHERE s.ID IS NULL ORDER BY dfdummy2.ID"""
sql(sqltext).count()

// write the RS to Oracle table

// Put new data into Oracle table
val connectionProperties = new Properties
connectionProperties.put("user", _username)
connectionProperties.put("password", _password)
connectionProperties.put("jdbUrl", _ORACLEserver)
connectionProperties.put("jdbcDriver", driverName)

//broadcast jdbc connection parameters to cluster nodes
val brConnect = sc.broadcast(connectionProperties)

val saveMode = SaveMode.Append
sql(sqltext).write.mode(saveMode).jdbc(_ORACLEserver,
_dbschema+"."+_dbtable, connectionProperties)

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 20 Jul 2019 at 08:13, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> JDBC read from Oracle table requires Oracle jdbc driver ojdbc6.jar or
> higher. ojdbc6.jar works for 11 and 12c added as --jars <PATH>/ojdbc6.jar
>
> Example with parallel read  (4 connections) to Oracle with ID being your
> PK in Oracle table
>
> var _ORACLEserver= "jdbc:oracle:thin:@rhes564:1521:mydb12"
> var _username = "scratchpad"
> var _password = "xxxx"
> //
> //
> // Get minID and maxID first
> //
> val minID = HiveContext.read.format("jdbc").options(Map("url" ->
> _ORACLEserver,"dbtable" -> "(SELECT cast(MIN(ID) AS INT) AS maxID FROM
> scratchpad.dummy)",
>        "user" -> _username, "password" ->
> _password)).load().collect.apply(0).getDecimal(0).toString
> val maxID = HiveContext.read.format("jdbc").options(Map("url" ->
> _ORACLEserver,"dbtable" -> "(SELECT cast(MAX(ID) AS INT) AS maxID FROM
> scratchpad.dummy)",
>        "user" -> _username, "password" ->
> _password)).load().collect.apply(0).getDecimal(0).toString
> val s = HiveContext.read.format("jdbc").options(
>        Map("url" -> _ORACLEserver,
>        "dbtable" -> "(SELECT ID, CLUSTERED, SCATTERED, RANDOMISED,
> RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
>        "partitionColumn" -> "ID",
>        "lowerBound" -> minID,
>        "upperBound" -> maxID,
>        "numPartitions" -> "4",
>        "user" -> _username,
>        "password" -> _password)).load
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 20 Jul 2019 at 07:42, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> This behaviour is governed by the underlying RDBMS for bulk insert, where
>> it either commits or roll backs.
>>
>> You can insert new rows into an staging table in Oracle (which is common
>> in ETL) and then insert/select into Oracle table in shell routine.
>>
>> The other way is to use JDBC in Spark to read Oracle table into a DF and
>> do a result set with Oracle DF and your DF and insert only those records
>> into Oracle.
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 20 Jul 2019 at 05:35, Richard <fifistorm...@gmail.com> wrote:
>>
>>> Any reason why Spark's SaveMode doesn't have mode that ignore any
>>> Primary Key/Unique constraint violations?
>>>
>>> Let's say I'm using spark to migrate some data from Cassandra to Oracle,
>>> I want the insert operation to be "ignore if exist primary keys" instead of
>>> failing the whole batch.
>>>
>>> Thanks,
>>> Richard
>>>
>>>

Reply via email to