Thank you for this! You're on the cutting edge of known issues around the
SDK, and why we still call it experimental.

We are looking to add a coder registry
the near term, please review for if it will handle your usecase!
Unfortunately "general" handling of interfaces (like color.Color) other
than ones that implement concrete Marshal/Unmarshal style methods might not
be possible. The reflect package doesn't provide a way to synthetically
produce an interface, so knowing the concrete value for a type might be the
only way. See how the encoding/gob package handles them.
<> for an example of
the difficulty around general interfaces.

Most of the issues around scaling are going to be solved with
SplittableDoFns, hopefully in the next two quarters for at least the first
representative IO.

The issue for autoscaling isn't CPU load, it's that there's no way for the
SDK to communicate to a runner "Hey, I have too much work, can someone else
do X Y and Z for me?". As such, IOs are constrained to a single machine
since they can't split the work. This means that all the data needs to be
read in at once, and would lead to memory retention.

The other issues around scaling should be resolved with better metrics
handling, also in progress, to give the runner estimates of Sizes of
PCollections and bundles. Between the two, this should solve memory issues,
since the runner can better estimate how much memory a certain amount of
work requires, and be able to divide the work.

Thanks again!

On Sun, 6 Jan 2019 at 13:31 Andrew Brampton <> wrote:

> Sorry I got busy over the holidays, and never wrote up the areas I
> struggled with. Here is a quick bulleted list. I've created JIRA issues (or
> found existing ones) where it seems appropriate:
> * Coder Registry <> - As
> soon as I wanted to send my own custom types, I hit problems. For example,
> my struct contained a color.Color (a interface) which currently can't be
> encoded. However, a simple coder would have most likely fixed my problem.
> * Direct doesn't marshal my data
> <> - I would test my
> pipeline using direct, and it would happily run on a sample. When I ran it
> on dataflow, it'll run for a hour, then get to a stage that would crash
> like so:
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error
> received from SDK harness for instruction -224: execute failed: panic:
> reflect: Call using main.HistogramResult as type struct { Key string
> "json:\"key\""; Files []string "json:\"files\""; Histogram
> palette.ColorHistogram "json:\"histogram,omitempty\""; Palette []struct { R
> uint8; G uint8; B uint8; A uint8 } "json:\"palette\"" } goroutine 70
> [running]:
> It is not obvious what that means, but its because I forgot to register my
> HistogramResult type. I had similar other errors, that could have easily
> been spotted by the direct pipeline, if it had just tried to marshal and
> unmarshal my types at init time. I'm sure there are additional checks the
> direct runner could do.
> * CSV support <> - I
> wanted to read my data from here csv file. I see the Java API doesn't seem
> to support this, but it was very easy to implement
> <>, and would be a
> easy thing to support.
> * Hard to understand why it failed - Multiple times my dataflow pipeline
> would fail, with a error like:
> "A work item was attempted 4 times without success. Each time the worker
> eventually lost contact with the service."
> Spelunking the logs didn't find much, and I spent a lot of time adding
> additional logging, adding a pprof handler, and testing. I came to the
> conclusion my pipeline would run out of memory, and a worker would crash /
> OOM.
> * Memory leaks - I was using the standard sized GCE instances
> (n1-standard-1), but I found my CoGBK would end up using a huge amount of
> RAM. I determined this by using pprof, and inspecting the job as it ran.
> Without reading the code, it seems that CoGroupByKey would staged the
> groups in RAM before iterating. My groups weren't too big (no more than 100
> items each), but it seemed it would stage multiple groups at the same time,
> it would run out of RAM.  Moving to n1-standard-2 fixed that problem.
> * Auto-scaling - It never seemed to work well, never going above a dozen
> or so workers. When I looking at the CPU usage most of the workers would be
> at 100%, but a couple would be at 0%. As if they failed to start, or
> stalled, or had no work to do. I didn't know how to debug that, other than
> noting none of my log lines were printed in the worker's logs. I had
> inserted a reshuffle early in my pipeline, so I thought the work should
> have been partioned across 1000s of workers.  I started to force the number
> of workers <> to get the
> performance I needed.
> * Auto-scaling with two CPUs
> <>- Once I moved
> to n1-standard-2, two v-cpus, auto-scaling would never scale beyond one
> worker.  I guess the Go Beam API isn't multi-threaded, because even when I
> forced the number of workers, each worker would never exceed 50% CPU.
> <> I suspect the auto-scaling is looking
> at CPU to decide when more workers are needed.
> * GCS - The version of the GCS library was old, and it didn't support
> context/timeouts. This caused my pipeline to stall and ever complete.
> Specifically, a connection to GCS should have timed out, but was hanging
> for some reason, causing my pipeline to make no progress for hours. I filed
> a couple <> of issues
> <> around this, and
> already fixed them. But anywhere we call an external service, should follow
> best practices around timeouts, retries, backoffs, etc.
> * Reshuffle / AddKey / others - I'm glad with SplittableDoFn that
> reshuffle won't be needed, but until then that might be useful. I also
> added a AddKey <> function,
> which allowed the user to pass a simple function that took a value, and
> returned a KV<key, value>. Kind of the opposite of DropKey
> <>. Feel
> free to use my implementation
> <>. For
> example:
> // Return a new PCollection<KV<string, Painting>> where the key is the
> artist.
> paintingsByArtist := morebeam.AddKey(s, func(painting Painting) string {
>      return painting.Artist
> }, paintings)
> * Elements added always blank
> <> - As seen here
> <>, the "elements added" for input and
> output collections was always empty.
> * Wall time always very small
> <>- Similar to above, my
> pipeline would run for hours, but the walltime would be seconds
> <>. That can't be right.
> I think that's it.
> Even though I found lots of rough edges, I was able to get what I wanted
> to do done, and it worked well in the end. I'd like to thank everyone for
> their hard work! This also was the first Flume/Dataflow/Pipeline I've ever
> written, so perhaps there are best practices I was missing.
> Thanks
> Andrew
> On Mon, 17 Dec 2018 at 08:39, Robert Burke <> wrote:
>> Thanks for the excellent article!
>> It's great to see what the experience is like from an outside
>> perspective, and it's comforting that it mirrors my own. It means I'm not
>> missing much.
>> It's been on my to-do list to make the Go SDK direct runner more robust,
>> so transitioning to other runners wouldn't be such a burden. I'd love for
>> it to have better error messages, and be more useful for testing.
>> Daniel Oliveira recently updated the Universal Runner guide to include
>> how to run Go SDK jobs against it. It should also catch the same issues,
>> and provides a free way to check that pipelines are correct. It has the
>> same single machine limitation though.
>> User Defined Coders and Pointer Elements (and their semantics) is
>> something I've been thinking about as well and will be working on within
>> the next month. JSON is ok for debugging but less so for performance at
>> scale. Let me know if you have any opinions on that! I intend to post to
>> the list about it this week.
>> As for reshuffle, other IOs, and scalability, as mentioned in the roadmap
>> (
>> we're mostly blocked on
>> SplittableDoFn support. With it, we wouldn't need to reshuffle, and would
>> gain more natural scaling of IOs. Once the Go SDK havs these it will br
>> well on it's way to not being experimental. :)
>> Finally, I'm obligated to mention that while the SDK works on Dataflow
>> it's not yet officially supported by the service.
>> Cheers
>> Robert B
>> On Mon, Dec 17, 2018, 8:07 AM Kenneth Knowles <> wrote:
>>> Nice!
>>> It reads really well. For the benefit of this list, would you be willing
>>> to summarize the rough edges (and maybe the "couple of other things" you
>>> had to implement) in a few bullet points? and/or file Jira issues if they
>>> are clear feature requests or bugs.
>>> Kenn
>>> On Mon, Dec 17, 2018 at 10:40 AM Andrew Brampton <>
>>> wrote:
>>>> Hey all,
>>>> I've recently been playing with the Go Beam SDK running on Dataflow. I
>>>> wrote up a tutorial for today's Go Advent blog.
>>>> Feel free to check it out:
>>>> Feedback is welcomed. I know the Go SDK is experimental, but I hit a
>>>> few rough edges. I also had to implement my own csvio, reshuffle, and a
>>>> couple of other things. I will be sharing my feedback on using go and
>>>> dataflow shortly.
>>>> Thanks
>>>> Andrew

Reply via email to