Hi Richard,

You can use the following to read JSON data into DF. The example is reading
JSON from Kafka topic

          val sc = spark.sparkContext
         import spark.implicits._
         // Use map to create the new RDD using the value portion of the
pair.
         val jsonRDD = pricesRDD.map(x => x._2)
         // Create DataFrame from jsonRDD
         val jsonDF = sqlContext.read.json(jsonRDD)

This is an example of reading a MongoDB document into Spark

dfrddMongoDB.printSchema
/*
root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- operation: struct (nullable = true)
 |    |-- op_type: integer (nullable = true)
 |    |-- op_time: string (nullable = true)
 |-- priceInfo: struct (nullable = true)
 |    |-- key: string (nullable = true)
 |    |-- ticker: string (nullable = true)
 |    |-- timeissued: string (nullable = true)
 |    |-- price: double (nullable = true)
 |    |-- currency: string (nullable = true)
// one example of mongo document from mongo collection
{
    "_id" : ObjectId("5cae4fa25d8b5279db785b43"),
    "priceInfo" : {
        "key" : "2ca8de24-eaf3-40d4-b0ef-c8b56534ceb5",
        "ticker" : "ORCL",
        "timeissued" : "2019-04-10T21:20:57",
        "price" : 41.13,
        "currency" : "GBP"
    },
    "operation" : {
        "op_type" : NumberInt(1),
        "op_time" : "1554927506012"
    }
}
*/
// Flatten the structs
val df = dfrddMongoDB.
               select(
                        'priceInfo.getItem("key").as("key")
                      , 'priceInfo.getItem("ticker").as("ticker")
                      , 'priceInfo.getItem("timeissued").as("timeissued")
                      , 'priceInfo.getItem("price").as("price")
                      , 'priceInfo.getItem("currency").as("currency")
                      , 'operation.getItem("op_type").as("op_type")
                      , 'operation.getItem("op_time").as("op_time")
                     )

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 19 Jul 2019 at 21:48, Richard <fifistorm...@gmail.com> wrote:

> let's say I use spark to migrate some data from Cassandra table to Oracle
> table
> Cassandra Table:
> CREATE TABLE SOURCE(
> id UUID PRIMARY KEY,
> col1 text,
> col2 text,
> jsonCol text
> );
> example jsonCol value: {"foo": "val1", "bar", "val2"}
>
> I am trying to extract fields from the json column while importing to
> Oracle table
> Destination (
> id varchar2(50),
> col1 varchar(128).
> col2 varchar(128)
> raw_json clob,
> foo varchar2(256),
> bar varchar2(256)
> );
>
> What I have done:
> separate udf for foo and bar.
> This approach works, but that also means I need to deserialize raw json to
> json object twice, things getting worse if i want to extract many fields
> from the json.
> example:
> df = df.withColumn("foo", getFoo.apply(col("jsonCol")))
>      .withColumn("bar", getBar.apply(col("jsonCol")));
> // getFoo and getBar are UserDefinedFunction
>
> how do I parse raw json string only once and explode fields I need to
> multiple columns into Oracle in spark?
>
> Thanks,
>
>
>
>
>
>

Reply via email to