Hi Michal,

For (1), would it be possible to partitionBy two columns to reduce the
size? Something like partitionBy("event_type", "date").

For (2), is there a way to separate the different event types upstream,
like on different Kafka topics, and then process them separately?

Xinh

On Wed, May 4, 2016 at 7:47 AM, Michal Vince <vince.mic...@gmail.com> wrote:

> Hi guys
>
> I`m trying to store kafka stream with ~5k events/s as efficiently as
> possible in parquet format to hdfs.
>
> I can`t make any changes to kafka (belongs to 3rd party)
>
>
> Events in kafka are in json format, but the problem is there are many
> different event types (from different subsystems with different number of
> fields, different size etc..) so it doesn`t make any sense to store them in
> the same file
>
>
> I was trying to read data to DF and then repartition it by event_type and
> store
>
> events.write.partitionBy("event_type").format("parquet").mode(org.apache.spark.sql.SaveMode.Append).save(tmpFolder)
>
> which is quite fast but have 2 drawbacks that I`m aware of
>
> 1. output folder has only one partition which can be huge
>
> 2. all DFs created like this share the same schema, so even dfs with few
> fields have tons of null fields
>
>
> My second try is bit naive and really really slow (you can see why in
> code) - filter DF by event type and store them temporarily as json (to get
> rid of null fields)
>
> val event_types = events.select($"event_type").distinct().collect() // get 
> event_types in this batch
> for (row <- event_types) {
>   val currDF = events.filter($"event_type" === row.get(0))
>   val tmpPath = tmpFolder + row.get(0)
>   
> currDF.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).save(tmpPath)
>   sqlContext.read.json(tmpPath).write.format("parquet").save(basePath)
>
> }hdfs.delete(new Path(tmpFolder), true)
>
>
> Do you have any suggestions for any better solution to this?
>
> thanks
>
>
>

Reply via email to