Hi Daniel, Right now, you need to do the transformation manually. The feature you need is under development (https://issues.apache.org/jira/browse/SPARK-4190).
Thanks, Yin On Tue, Nov 4, 2014 at 2:44 AM, Gerard Maas <gerard.m...@gmail.com> wrote: > You could transform the json to a case class instead of serializing it > back to a String. The resulting RDD[MyCaseClass] is then directly usable as > a SchemaRDD using the register function implicitly provided by 'import > sqlContext.schemaRDD'. Then the rest of your pipeline will remain the same. > > -kr, Gerard > On Nov 4, 2014 5:05 AM, "Daniel Mahler" <dmah...@gmail.com> wrote: > >> I am trying to convert terabytes of json log files into parquet files. >> but I need to clean it a little first. >> I end up doing the following >> >> txt = sc.textFile(inpath).coalesce(800) >> >> val json = (for { >> line <- txt >> JObject(child) = parse(line) >> child2 = (for { >> JField(name, value) <- child >> _ <- patt(name) // filter fields with invalid names >> } yield JField(name.toLowerCase, value)) >> } yield compact(render(JObject(child2)))) >> >> sqx.jsonRDD(json, 5e-2).saveAsParquetFile(outpath) >> >> And glaring inefficiency is that after parsing and cleaning the data i >> reserialize it >> by calling compact(render(JObject(child2)))) only to pass the text >> to jsonRDD to be parsed agian. However I see no way to turn an RDD of >> json4s objects directly into a SchemRDD without turning it back into text >> first >> >> Is there any way to do this? >> >> I am also open to other suggestions for speeding up the above code, >> it is very slow in its current form. >> >> I would also like to make jsonFile drop invalid json records rather than >> failing the entire job. Is that possible? >> >> thanks >> Daniel >> >>