When joining (Join.leftOuterJoin etc) a PCollection<K, V1> to PCollection<K, V2>, and K:V1 contains hot keys, my pipeline gets very slow. It can bring processing time from hours to days.
Reading this blog post <https://cloud.google.com/blog/big-data/2016/02/writing-dataflow-pipelines-with-scalability-in-mind> I can see some thought has already been given to this problem: "To address this, we allow you to provide extra parallelism hints using the Combine.PerKey.withHotKeyFanout or Combine.Globally.withFanout. These operations will create an extra step in your pipeline to pre-aggregate the data on many machines before performing the final aggregation on the target machines." (1 of 2) These two solutions, Combine.PerKey.withHotKeyFanout or Combine.Globally.withFanout, do not help with a join (CoGBK) operation, however. So, I solved my problem with these stages before and after the join operation, effectively joining K:Iterable<V1> with K:V2: kvIterable1 = kv1.apply("GBK to mitigate hot keys", GroupByKey.<K, V1>create()) Join.someJoin(kvIterable1, kv2) .apply(Values.create()) .apply("undo hot key GBK", ParDo .of(new DoFn<KV<Iterable<V1>, V2>, KV<V1, V2>>() { @ProcessElement public void fanout(ProcessContext context) { for (V1 v1 : context.element().getKey()) { context.output(KV.of(v1, context.element().getValue())); } } })) Does that look sane to people who have been working with Beam for a long time? It has worked well for us over the last two months or so. (2 of 2) Lately, the size of the value has grown too large. It took some effort to figure out the problem, which manifested as an ArrayIndexOutOfBoundsException emitted from RandomAccessData.write(). Here's the follow-up solution, only changing the first half of the above solution: kvIterable1 = kv1 .apply("GBK to mitigate hot keys", GroupByKey.<K, V1>create()) .apply("partition grouped values", ParDo .of(new DoFn<KV<K, Iterable<V1>>, KV<K, Iterable<V1>>>() { @ProcessElement public void partition(ProcessContext context) { K k = context.element().getKey(); Iterable<V1> v1Iterable = context.element().getValue(); for (List<V1> partition : Iterables.partition(v1Iterable, 1000000)) { context.output(KV.<K, Iterable<V1>>of(k, partition)); } } })); Again, is this sane? Initial testing suggests this is a good solution. Jacob