Hi Daniel,

Right now, you need to do the transformation manually. The feature you need
is under development (https://issues.apache.org/jira/browse/SPARK-4190).

Thanks,

Yin

On Tue, Nov 4, 2014 at 2:44 AM, Gerard Maas <gerard.m...@gmail.com> wrote:

> You could transform the json to a case class instead of serializing it
> back to a String. The resulting RDD[MyCaseClass] is then directly usable as
> a SchemaRDD using the register function implicitly provided by 'import
> sqlContext.schemaRDD'. Then the rest of your pipeline will remain the same.
>
> -kr, Gerard
> On Nov 4, 2014 5:05 AM, "Daniel Mahler" <dmah...@gmail.com> wrote:
>
>> I am trying to convert terabytes of json log files into parquet files.
>> but I need to clean it a little first.
>> I end up doing the following
>>
>> txt = sc.textFile(inpath).coalesce(800)
>>
>> val json = (for {
>>          line <- txt
>>          JObject(child) = parse(line)
>>          child2 = (for {
>>            JField(name, value) <- child
>>            _ <- patt(name) // filter fields with invalid names
>>          } yield JField(name.toLowerCase, value))
>> } yield compact(render(JObject(child2))))
>>
>> sqx.jsonRDD(json, 5e-2).saveAsParquetFile(outpath)
>>
>> And glaring inefficiency is that after parsing and cleaning the data i
>> reserialize it
>> by calling compact(render(JObject(child2)))) only to pass the text
>> to jsonRDD to be parsed agian. However I see no way  to turn an RDD of
>> json4s objects directly into a SchemRDD without turning it back into text
>> first
>>
>> Is there any way to do this?
>>
>> I am also open to other suggestions for speeding up the above code,
>> it is very slow in its current form.
>>
>> I would also like to make jsonFile drop invalid json records rather than
>> failing the entire job. Is that possible?
>>
>> thanks
>> Daniel
>>
>>

Reply via email to