I have also wrestled with throughput for FileIO and BigQueryIO on Dataflow, and in my case the bottleneck came down to disk I/O throughput on the worker machines. Writing with FileIO or BigQueryIO involves several group by key operations that in the Dataflow case require checkpointing state to disk.
Moving to SSD on the workers ended up being much more important in our case vs. tuning sharding and windowing. Windowing of 1, 5, or 10 minutes all sounds reasonable and I don't expect will have a big impact on observed performance, since I believe Dataflow is checkpointing to disk at the bundle level and it's not necessary that the whole window fit in memory. You should have numShards at least as high as your number of workers. Try SSD (workerDiskType=' compute.googleapis.com/projects//zones//diskTypes/pd-ssd') and diskSizeGb=500. Last I checked, 500 GB maximized the disk I/O per worker per Dataflow docs. On Fri, May 17, 2019 at 7:07 AM Ziyad Muhammed <[email protected]> wrote: > Hi, > > I have a kafka event stream that produces 80k messages per second. > Messages are in protobuf format and is roughly 800 bytes in size. I need > these event data to be loaded in a Bigquery table using Beam/Dataflow. > > The choice of streaming inserts was discarded due to the high cost (for > the above mentioned throughput, the insertion cost itself was estimated to > be ~9k $) > > The two other options are to use FILE_LOADS api to directly load the data > as TableRows or write as avro files to GCS and do a scheduled load to > Bigquery (for example, using airflow) > > I tried both options, but couldn't get the desired throughput with upto 64 > worker machine cores (e.g. *16 * n1-standard-4* -or- *64 * n1-standard-1 > *etc.|). > The kafka topic has 16 partitions. > > For FILE_LOADS, I tried with numshards as 1000 or more and triggering > frequency of 2, 5 or 10 mins. > For AvroIO, I'm missing the recommended values for input parameters. > 1. what window duration should I use? (I was using 1 min window, not to > have too much elements in memory. Latency of the pipeline is not a big > concern) > 2. what is the recommended value of number of shards? (num_shards = 0 so > that system decides, didnt work for dataflow runner) > 3. should I customize gcs upload buffer size? > 4. I was choosing the number of worker nodes proportional to the number of > kafka topics. Is that the right approach? which kind of machines and how > many are suitable for this use case? > > And in general, is it at all possible to have this ingest to Bigquery in > any batch fasion with a less cost than streaming inserts? > > Any inputs are highly appreciated! > > Best > Ziyad >
