I'm looking forward to 2.1 but, in the meantime, you can pull out the
specific column into an RDD of JSON objects, pass this RDD into the
read.json() and then join the results back onto your initial DF.

Here is an example of what we do to unpack headers from Avro log data:

def jsonLoad(path):
    #
    #load in the df
    raw = (sqlContext.read
            .format('com.databricks.spark.avro')
            .load(path)
        )
    #
    #define json blob, add primary key elements (hi and lo)
    #
    JSONBlob = concat(
        lit('{'),
        concat(lit('"lo":'), col('header.eventId.lo').cast('string'),
lit(',')),
        concat(lit('"hi":'), col('header.eventId.hi').cast('string'),
lit(',')),
        concat(lit('"response":'), decode('requestResponse.response',
'UTF-8')),
        lit('}')
    )
    #
    #extract the JSON blob as a string
    rawJSONString = raw.select(JSONBlob).rdd.map(lambda x: str(x[0]))
    #
    #transform the JSON string into a DF struct object
    structuredJSON = sqlContext.read.json(rawJSONString)
    #
    #join the structured JSON back onto the initial DF using the hi and lo
join keys
    final = (raw.join(structuredJSON,
                ((raw['header.eventId.lo'] == structuredJSON['lo']) &
(raw['header.eventId.hi'] == structuredJSON['hi'])),
                'left_outer')
            .drop('hi')
            .drop('lo')
        )
    #
    #win
    return final

On Wed, Nov 16, 2016 at 10:50 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> On Wed, Nov 16, 2016 at 2:49 AM, Hyukjin Kwon <gurwls...@gmail.com> wrote:
>
>> Maybe it sounds like you are looking for from_json/to_json functions
>> after en/decoding properly.
>>
>
> Which are new built-in functions that will be released with Spark 2.1.
>

Reply via email to