If you have shuffles between the "Partition" operator and the "Write"
operator, yes. Note that most runners will run a partition and all its
outputs in the same thread at the same time. The way you separate these is,
in many runners, by inserting a shuffle.

Alternately, you can look at the GcsOptions
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java#L75>
to find a way to reduce the memory consumption of the network to GCS, if
you want to reduce the total memory taken by each of those 1000 connections
in the same thread.

On Fri, Feb 17, 2017 at 5:08 AM, Tobias Feldhaus <
[email protected]> wrote:

> Hi Dan,
>
> just one follow up question, as I have completely revised my pipeline now
> and
> want to write AVRO files to GCS first (one per day). You said that
>
>         by default writing to GCS uses a 64 MiB buffer so if you have 10
> partitions
>         you're allocating 640 MiB, per core, just for those network
> buffers.
>
> Can I somehow optimize this? Would it be possible to partition a
> PCollection
> into 1000 partitions when using “enough” workers with “enough” memory?
>
> Tobias
>
>
> On 13.02.17, 09:42, "Tobias Feldhaus" <[email protected]>
> wrote:
>
>     Hi Dan,
>
>     Thank you for your response!
>
>     The approach I am using to write per window tables seems to work in
> batch and
>     streaming mode, at least this is claimed here [0], and I have
> confirmed this
>     with the author of this post. I also tested this with smaller files in
> my own
>     setup.
>
>     Would a shuffling operation on a non-key-value
>     input look like this [1], or is there already some PTransform in the
> SDK that I
>     am not aware of?
>
>     Tobias
>
>     [0] http://stackoverflow.com/a/40863609/5497956
>     [1] http://stackoverflow.com/a/40769445/5497956
>
>     From: Dan Halperin <[email protected]>
>     Reply-To: "[email protected]" <[email protected]>
>     Date: Saturday, 11 February 2017 at 21:31
>     To: "[email protected]" <[email protected]>
>     Subject: Re: Implicit file-size limit of input files?
>
>     Hi Tobias,
>
>     There should be no specific limitations in Beam on file size or
> otherwise, obviously different runners and different size clusters will
> have different potential scalability.
>
>     A few general Beam tips:
>
>     * Reading from compressed files is often a bottleneck, as this work is
> not parallelizable. If you find reading from compressed files is a
> bottleneck, you may want to follow it with a shuffling operation to improve
> parallelism as most runners can run the work pre- and post-shuffle on
> different machines (with different scaling levels).
>
>     * The Partition operator on its own does not improve parallelism.
> Depending on how the runner arranges the graph, when you partition N ways
> you may still execute all N partitions on the same machine. Again, a
> shuffling operator here will often let runners to execute the N branches
> separately.
>
>        (There are known issues for certain sinks when N is high. For
> example, by default writing to GCS uses a 64 MiB buffer so if you have 10
> partitions you're allocating 640 MiB, per core, just for those network
> buffers.)
>
>     It sounds like you may be trying to use the "to(Partition function)"
> method of writing per window tables. The javadoc for BigQueryIO.Write
> clearly documents (https://github.com/apache/
> beam/blob/master/sdks/java/io/google-cloud-platform/src/
> main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L232) that
> it is not likely to work in "batch" runners.
>
>     I suggest reaching out to Google Cloud via the recommendations at
> https://cloud.google.com/dataflow/support if you have issues specific to
> the Google Cloud Dataflow runner.
>
>     Dan
>
>     On Fri, Feb 10, 2017 at 3:18 AM, Tobias Feldhaus <
> [email protected]> wrote:
>     Addendum: When running in streaming mode with version 0.5 of the SDK,
>     the elements are basically stuck before getting emitted [0], but the
> whole
>     process starts and is running up to a point when most likely the
> memory is
>     full (GC overhead error) and it crashes [0].
>
>     It seems like the Reshuffle that is taking place prevents any output
> to happen.
>     To get rid of that, I would need to find another way to write to a
> partition in
>     BigQuery in batch mode without using the workaround that is described
> here [1],
>     but I don't know how.
>
>     [0] https://puu.sh/tWInq/f41beae65b.png
>     [1] http://stackoverflow.com/questions/38114306/creating-
> writing-to-parititoned-bigquery-table-via-google-cloud-dataflow/40863609#
> 40863609
>
>     On 10.02.17, 10:34, "Tobias Feldhaus" <[email protected]>
> wrote:
>
>         Hi,
>
>         I am currently facing a problem with a relatively simple pipeline
> [0] that is
>         reading gzipped JSON files on Google Cloud Storage (GCS), adding a
> timestamp,
>         and pushing it into BigQuery. The only special thing I am doing as
> well is
>         partitioning it via a PartioningWindowFn that is assigning a
> partition
>         for each element as described here [1].
>
>         The pipeline works locally and remotely on the Google Cloud
> Dataflow Service
>         (GCDS) with smaller test files, but if I run it on the about 100
> real ones with
>         2GB each it breaks down in streaming and batch mode with different
> errors.
>
>         The pipeline runs in batch mode, but in the end it gets stuck with
> processing only
>         1000-5000 streaming inserts per second to BQ, while constantly
> scaling up the
>         number of instances [2]. As you can see in the screenshot the
> shuffle never
>         started, before I had to stop it to cut the costs.
>
>         If run in streaming mode, the pipeline creation fails because of a
> resource
>         allocation failure (Step setup_resource_disks_harness19: Set up
> of resource
>         disks_harness failed: Unable to create data disk(s): One or more
> operations
>         had an error: [QUOTA_EXCEEDED] 'Quota 'DISKS_TOTAL_GB' exceeded.
>         Limit: 80000.0) This means, it has requested more than 80 (!) TB
> for the job that
>         operates on 200 GB compressed (or 2 TB uncompressed) files.
>
>         I’ve tried to run it with instances that are as large as
> n1-highmem-16
>         (104 GB memory each) and 1200 GB local storage.
>
>         I know this is a mailing list of Apache Beam and not intended for
> GCDF support,
>         my question is therefore if anyone has faced the issue with the
> SDK before, or
>         if there is a known size limit for files.
>
>
>         Thanks,
>         Tobias
>
>         [0] https://gist.github.com/james-woods/
> 98901f7ef2b405a7e58760057c48162f
>         [1] http://stackoverflow.com/a/40863609/5497956
>         [2] https://puu.sh/tWzkh/49b99477e3.png
>
>
>
>
>
>
>
>

Reply via email to