Hi Xinh
For (1) the biggest problem are those null columns. e.g. DF will have
~1000 columns so every partition of that DF will have ~1000 columns, one
of the partitioned columns can have 996 null columns which is big waste
of space (in my case more than 80% in avg)
for (2) I can`t really change anything as the source belongs to the 3rd
party
Miso
On 05/04/2016 05:21 PM, Xinh Huynh wrote:
Hi**Michal,
For (1), would it be possible to partitionBy two columns to reduce the
size? Something like partitionBy("event_type", "date").
For (2), is there a way to separate the different event types
upstream, like on different Kafka topics, and then process them
separately?
Xinh
On Wed, May 4, 2016 at 7:47 AM, Michal Vince <vince.mic...@gmail.com
<mailto:vince.mic...@gmail.com>> wrote:
Hi guys
I`m trying to store kafka stream with ~5k events/s as efficiently
as possible in parquet format to hdfs.
I can`t make any changes to kafka (belongs to 3rd party)
Events in kafka are in json format, but the problem is there are
many different event types (from different subsystems with
different number of fields, different size etc..) so it doesn`t
make any sense to store them in the same file
I was trying to read data to DF and then repartition it by
event_type and store
events.write.partitionBy("event_type").format("parquet").mode(org.apache.spark.sql.SaveMode.Append).save(tmpFolder)
which is quite fast but have 2 drawbacks that I`m aware of
1. output folder has only one partition which can be huge
2. all DFs created like this share the same schema, so even dfs
with few fields have tons of null fields
My second try is bit naive and really really slow (you can see why
in code) - filter DF by event type and store them temporarily as
json (to get rid of null fields)
val event_types =events.select($"event_type").distinct().collect() // get
event_types in this batch
for (row <- event_types) {
val currDF =events.filter($"event_type" === row.get(0))
val tmpPath =tmpFolder + row.get(0)
currDF.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).save(tmpPath)
sqlContext.read.json(tmpPath).write.format("parquet").save(basePath)
}
hdfs.delete(new Path(tmpFolder),true)
Do you have any suggestions for any better solution to this?
thanks