problem extracting map from json
Hi guys I`m trying to extract Map[String, Any] from json string, this works well in any scala repl I tried, both scala 2.11 and 2.10 and using both json4s and liftweb-json libraries, but if I try to do the same thing in spark-shell I`m always getting |No information known about type...| exception I`ved tried different versions of json4s and liftweb-json but with the same result. I was wondering if anybody have idea what I might be doing wrong. I`m using spark 1.6.1 precompiled with mapr hadoop distro in scala 2.10.5 scala> import org.json4s._ import org.json4s._ scala> import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.JsonMethods._ scala> scala> implicit val formats = org.json4s.DefaultFormats formats: org.json4s.DefaultFormats.type = org.json4s.DefaultFormats$@46270641 scala> val json = parse(""" { "name": "joe", "children": [ { "name": "Mary", "age": 5 }, { "name": "Mazy", "age": 3 } ] } """) json: org.json4s.JValue = JObject(List((name,JString(joe)), (children,JArray(List(JObject(List((name,JString(Mary)), (age,JInt(5, JObject(List((name,JString(Mazy)), (age,JInt(3) scala> json.extract[Map[String, Any]] org.json4s.package$MappingException: No information known about type at org.json4s.Extraction$ClassInstanceBuilder.org$json4s$Extraction$ClassInstanceBuilder$$instantiate(Extraction.scala:465) at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$result$6.apply(Extraction.scala:491) at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$result$6.apply(Extraction.scala:488) at org.json4s.Extraction$.org$json4s$Extraction$$customOrElse(Extraction.scala:500) at org.json4s.Extraction$ClassInstanceBuilder.result(Extraction.scala:488) at org.json4s.Extraction$.extract(Extraction.scala:332) at org.json4s.Extraction$$anonfun$extract$5.apply(Extraction.scala:316) at org.json4s.Extraction$$anonfun$extract$5.apply(Extraction.scala:316) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.json4s.Extraction$.extract(Extraction.scala:316) at org.json4s.Extraction$.extract(Extraction.scala:42) at org.json4s.ExtractableJsonAstNode.extract(ExtractableJsonAstNode.scala:21) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:36) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51) at $iwC$$iwC$$iwC$$iwC$$iwC.(:53) at $iwC$$iwC$$iwC$$iwC.(:55) at $iwC$$iwC$$iwC.(:57) at $iwC$$iwC.(:59) at $iwC.(:61) at (:63) at .(:67) at .() at .(:7) at .() at $print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at
Re: groupBy and store in parquet
Hi Xinh sorry for my late reply it`s slow because of two reasons (at least to my knowledge) 1. lots of IOs - writing as json, then reading and writing again as parquet 2. because of nested rdd I can`t run the cycle and filter by event_type in parallel - this applies to your solution (3rd step) I ended up with the suggestion you proposed - in realtime partition by event_type and store as jsons (which is pretty fast) and with another job which runs less frequently read jsons and store them as parquet thank you very much best regards Michal On 05/05/2016 06:02 PM, Xinh Huynh wrote: Hi Michal, Why is your solution so slow? Is it from the file IO caused by storing in a temp file as JSON and then reading it back in and writing it as Parquet? How are you getting "events" in the first place? Do you have the original Kafka messages as an RDD[String]? Then how about: 1. Start with eventsAsRDD : RDD[String] (before converting to DF) 2. eventsAsRDD.map() --> use a RegEx to parse out the event_type of each event For example, search the string for {"event_type"="[.*]"} 3. Now, filter by each event_type to create a separate RDD for each type, and convert those to DF. You only convert to DF for events of the same type, so you avoid the NULLs. Xinh On Thu, May 5, 2016 at 2:52 AM, Michal Vince <vince.mic...@gmail.com <mailto:vince.mic...@gmail.com>> wrote: Hi Xinh For (1) the biggest problem are those null columns. e.g. DF will have ~1000 columns so every partition of that DF will have ~1000 columns, one of the partitioned columns can have 996 null columns which is big waste of space (in my case more than 80% in avg) for (2) I can`t really change anything as the source belongs to the 3rd party Miso On 05/04/2016 05:21 PM, Xinh Huynh wrote: Hi**Michal, For (1), would it be possible to partitionBy two columns to reduce the size? Something like partitionBy("event_type", "date"). For (2), is there a way to separate the different event types upstream, like on different Kafka topics, and then process them separately? Xinh On Wed, May 4, 2016 at 7:47 AM, Michal Vince <vince.mic...@gmail.com <mailto:vince.mic...@gmail.com>> wrote: Hi guys I`m trying to store kafka stream with ~5k events/s as efficiently as possible in parquet format to hdfs. I can`t make any changes to kafka (belongs to 3rd party) Events in kafka are in json format, but the problem is there are many different event types (from different subsystems with different number of fields, different size etc..) so it doesn`t make any sense to store them in the same file I was trying to read data to DF and then repartition it by event_type and store events.write.partitionBy("event_type").format("parquet").mode(org.apache.spark.sql.SaveMode.Append).save(tmpFolder) which is quite fast but have 2 drawbacks that I`m aware of 1. output folder has only one partition which can be huge 2. all DFs created like this share the same schema, so even dfs with few fields have tons of null fields My second try is bit naive and really really slow (you can see why in code) - filter DF by event type and store them temporarily as json (to get rid of null fields) val event_types =events.select($"event_type").distinct().collect() // get event_types in this batch for (row <- event_types) { val currDF =events.filter($"event_type" === row.get(0)) val tmpPath =tmpFolder + row.get(0) currDF.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).save(tmpPath) sqlContext.read.json(tmpPath).write.format("parquet").save(basePath) } hdfs.delete(new Path(tmpFolder),true) Do you have any suggestions for any better solution to this? thanks
Re: groupBy and store in parquet
Hi Xinh For (1) the biggest problem are those null columns. e.g. DF will have ~1000 columns so every partition of that DF will have ~1000 columns, one of the partitioned columns can have 996 null columns which is big waste of space (in my case more than 80% in avg) for (2) I can`t really change anything as the source belongs to the 3rd party Miso On 05/04/2016 05:21 PM, Xinh Huynh wrote: Hi**Michal, For (1), would it be possible to partitionBy two columns to reduce the size? Something like partitionBy("event_type", "date"). For (2), is there a way to separate the different event types upstream, like on different Kafka topics, and then process them separately? Xinh On Wed, May 4, 2016 at 7:47 AM, Michal Vince <vince.mic...@gmail.com <mailto:vince.mic...@gmail.com>> wrote: Hi guys I`m trying to store kafka stream with ~5k events/s as efficiently as possible in parquet format to hdfs. I can`t make any changes to kafka (belongs to 3rd party) Events in kafka are in json format, but the problem is there are many different event types (from different subsystems with different number of fields, different size etc..) so it doesn`t make any sense to store them in the same file I was trying to read data to DF and then repartition it by event_type and store events.write.partitionBy("event_type").format("parquet").mode(org.apache.spark.sql.SaveMode.Append).save(tmpFolder) which is quite fast but have 2 drawbacks that I`m aware of 1. output folder has only one partition which can be huge 2. all DFs created like this share the same schema, so even dfs with few fields have tons of null fields My second try is bit naive and really really slow (you can see why in code) - filter DF by event type and store them temporarily as json (to get rid of null fields) val event_types =events.select($"event_type").distinct().collect() // get event_types in this batch for (row <- event_types) { val currDF =events.filter($"event_type" === row.get(0)) val tmpPath =tmpFolder + row.get(0) currDF.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).save(tmpPath) sqlContext.read.json(tmpPath).write.format("parquet").save(basePath) } hdfs.delete(new Path(tmpFolder),true) Do you have any suggestions for any better solution to this? thanks
groupBy and store in parquet
Hi guys I`m trying to store kafka stream with ~5k events/s as efficiently as possible in parquet format to hdfs. I can`t make any changes to kafka (belongs to 3rd party) Events in kafka are in json format, but the problem is there are many different event types (from different subsystems with different number of fields, different size etc..) so it doesn`t make any sense to store them in the same file I was trying to read data to DF and then repartition it by event_type and store events.write.partitionBy("event_type").format("parquet").mode(org.apache.spark.sql.SaveMode.Append).save(tmpFolder) which is quite fast but have 2 drawbacks that I`m aware of 1. output folder has only one partition which can be huge 2. all DFs created like this share the same schema, so even dfs with few fields have tons of null fields My second try is bit naive and really really slow (you can see why in code) - filter DF by event type and store them temporarily as json (to get rid of null fields) val event_types =events.select($"event_type").distinct().collect() // get event_types in this batch for (row <- event_types) { val currDF =events.filter($"event_type" === row.get(0)) val tmpPath =tmpFolder + row.get(0) currDF.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).save(tmpPath) sqlContext.read.json(tmpPath).write.format("parquet").save(basePath) } hdfs.delete(new Path(tmpFolder),true) Do you have any suggestions for any better solution to this? thanks
1.6.0 spark.sql datetime conversion problem
Hi guys I`m using spark 1.6.0 and I`m not sure if I found a bug or I`m doing something wrong I`m playing with dataframes and I`m converting iso 8601 with millis to my timezone - which is Europe/Bratislava with fromt_utc_timestamp function from spark.sql.functions the problem is that Europe/Bratislava is UTC+1 hour in february, and from 27th March it`s going to be UTC+2h but from_utc_timestamp ignores this and always converts to UTC+2h am doing something wrong when converting? e.g. myDF.withColumn("time", from_utc_timestamp(l.col("timestamp"),"Europe/Bratislava")) and the output: timetimestamp 2016-02-22 02:59:11.0 2016-02-22T00:59:11.000Z 2016-02-20 20:16:35.0 2016-02-20T18:16:35.000Z 2016-02-20 05:17:29.0 2016-02-20T03:17:29.000Z 2016-02-18 18:29:06.0 2016-02-18T16:29:06.000Z 2016-02-17 01:47:20.0 2016-02-16T23:47:20.000Z 2016-02-15 07:05:04.0 2016-02-15T05:05:04.000Z 2016-02-13 23:25:14.0 2016-02-13T21:25:14.000Z