Hi guys

I`m trying to store kafka stream with ~5k events/s as efficiently as possible in parquet format to hdfs.

I can`t make any changes to kafka (belongs to 3rd party)


Events in kafka are in json format, but the problem is there are many different event types (from different subsystems with different number of fields, different size etc..) so it doesn`t make any sense to store them in the same file


I was trying to read data to DF and then repartition it by event_type and store

events.write.partitionBy("event_type").format("parquet").mode(org.apache.spark.sql.SaveMode.Append).save(tmpFolder)

which is quite fast but have 2 drawbacks that I`m aware of

1. output folder has only one partition which can be huge

2. all DFs created like this share the same schema, so even dfs with few fields have tons of null fields


My second try is bit naive and really really slow (you can see why in code) - filter DF by event type and store them temporarily as json (to get rid of null fields)

val event_types =events.select($"event_type").distinct().collect() // get 
event_types in this batch

for (row <- event_types) {
  val currDF =events.filter($"event_type" === row.get(0))
  val tmpPath =tmpFolder + row.get(0)
  
currDF.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).save(tmpPath)
  sqlContext.read.json(tmpPath).write.format("parquet").save(basePath)

}
hdfs.delete(new Path(tmpFolder),true)


Do you have any suggestions for any better solution to this?

thanks


Reply via email to