Re: Sum over many keys, over TB of parquet, from HDFS (S3)

2018-03-13 Thread Marián Dvorský
Hi Guillaume,

You may want to avoid the final join by using CombineFns.compose()

 instead.

Marian

On Tue, Mar 13, 2018 at 9:07 PM Guillaume Balaine  wrote:

> Hello Beamers,
>
> I have been a Beam advocate for a while now, and am trying to use it for
> batch jobs as well as streaming jobs.
> I am trying to prove that it can be as fast as Spark for simple use cases.
> Currently, I have a Spark job that processes a sum + count over a TB of
> parquet files that runs in roughly 90 min.
> Using the same resources (on EMR or on Mesos) I can't even come close to
> that.
> My job with Scio/Beam and Flink processes sums roughly 20 Billion rows of
> parquet in 3h with 144 parallelism and 1TB of Ram (although I suspect many
> operators are idle, so I should probably use less parallelism with the same
> amount of cores).
> I also implemented an identical version in pure Java because I am unsure
> whether or not the Kryo encoded tuples are properly managed by the Flink
> memory optimizations. And am also testing it on Spark and Apex.
>
> My point is, is there anyway to optimize this simple process :
> HadoopFileIO (using parquet and specific avro coders to improve perf over
> Generic) ->
> Map to KV of (field1 str, field2 str, field3 str) (value double, 1)
> ordered by most discriminating to least -> Combine.perKey(Sum)
> Or value and then join Sum and Count with a TupledPCollection
> -> AvroIO.Write
>
> The equivalent Spark Job does a group by key, and then a sum.
>
> Are there some tricks I am missing here ?
>
> Thanks in advance for your help.
>


Sum over many keys, over TB of parquet, from HDFS (S3)

2018-03-13 Thread Guillaume Balaine
Hello Beamers,

I have been a Beam advocate for a while now, and am trying to use it for
batch jobs as well as streaming jobs.
I am trying to prove that it can be as fast as Spark for simple use cases.
Currently, I have a Spark job that processes a sum + count over a TB of
parquet files that runs in roughly 90 min.
Using the same resources (on EMR or on Mesos) I can't even come close to
that.
My job with Scio/Beam and Flink processes sums roughly 20 Billion rows of
parquet in 3h with 144 parallelism and 1TB of Ram (although I suspect many
operators are idle, so I should probably use less parallelism with the same
amount of cores).
I also implemented an identical version in pure Java because I am unsure
whether or not the Kryo encoded tuples are properly managed by the Flink
memory optimizations. And am also testing it on Spark and Apex.

My point is, is there anyway to optimize this simple process :
HadoopFileIO (using parquet and specific avro coders to improve perf over
Generic) ->
Map to KV of (field1 str, field2 str, field3 str) (value double, 1) ordered
by most discriminating to least -> Combine.perKey(Sum)
Or value and then join Sum and Count with a TupledPCollection
-> AvroIO.Write

The equivalent Spark Job does a group by key, and then a sum.

Are there some tricks I am missing here ?

Thanks in advance for your help.