On 10/2/19 4:30 AM, Reuven Lax wrote:
On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:> The fact that the annotation on the ParDo "changes" the GroupByKey implementation is very specific to the Spark runner implementation. I don't quite agree. It is not very specific to Spark, it is specific to generally all runners, that produce grouped elements in a way that is not reiterable. That is the key property. The example you gave with HDFS does not satisfy this condition (files on HDFS are certainly reiterable), and that's why no change to the GBK is needed (it actually already has the required property). A quick look at what FlinkRunner (at least non portable does) is that it implements GBK using reducing elements into List. That is going to crash on big PCollection, which is even nicely documented: * <p>For internal use to translate {@link GroupByKey}. For a large {@link PCollection} this is * expected to crash! If this is fixed, then it is likely to start behave the same as Spark. So actually I think the opposite is true - Dataflow is a special case, because of how its internal shuffle service works.I think you misunderstood - I was not trying to dish on the Spark runner. Rather my point is that whether the GroupByKey implementation is affected or not is runner dependent. In some runners it is and in others it isn't. However in all cases the /semantics/ of the ParDo is affected. Since Beam tries as much as possible to be runner agnostic, we should default to making the change where there is an obvious semantic difference.
I understand that, but I just don't think, that a semantics should be affected by this. If outcome of GBK is Iterable, then it should be reiterable, that is how Iterable works, so I now lean more towards a conclusion, that the current behavior of Spark runner simply breaks this contract. Solution of this would be to introduce the proposed GroupByKeyOneShot or Reduce(By|Per)Key.
> In general I sympathize with the worry about non-local effects. Beam is already full of them (e.g. a Window.into statement effects downstream GroupByKeys). In each case where they were added there was extensive debate and discussion (Windowing semantics were debated for many months), exactly because there was concern over adding these non-local effects. In every case, no other good solution could be found. For the case of windowing for example, it was often easy to propose simple local APIs (e.g. just pass the window fn as a parameter to GroupByKey), however all of these local solutions ended up not working for important use cases when we analyzed them more deeply. That is very interesting. Could you elaborate more about some examples of the use cases which didn't work? I'd like to try to match it against how Euphoria is structured, it should be more resistant to this non-local effects, because it very often bundles together multiple Beam's primitives to single transform - ReduceByKey is one example of this, if is actually mix of Window.into() + GBK + ParDo, Although it might look like if this transform can be broken down to something else, then it is not primitive (euphoria has no native equivalent of GBK itself), but it has several other nice implications - that is that Combine now becomes a special case of RBK. It now becomes only a question of where and how you can "run" the reduce function. The logic is absolutely equal. This can be worked in more detail and actually show, that even Combine and RBK can be decribed by a more general stateful operation (ReduceStateByKey), and so finally Euphoria actually has only two really "primitive" operations - these are FlatMap (basically stateless ParDo) and RSBK. As I already mentioned on some other thread, when stateful ParDo would support merging windows, it can be shown that both Combine and GBK become special cases of this. > As you mentioned below, I do think it's perfectly reasonable for a DSL to impose its own semantics. Scio already does this - the raw Beam API is used by a DSL as a substrate, but the DSL does not need to blindly mirror the semantics of the raw Beam API - at least in my opinion! Sure, but currently, there is no way for DSL to "hook" into runner, so it has to use raw Beam SDK, and so this will fail in cases like this - where Beam actually has stronger guarantees than it is required by the DSL. It would be cool if we could find a way to do that - this pretty much aligns with another question raised on ML, about the possibility to override a default implementation of a PTransform for specific pipeline. Jan On 9/29/19 7:46 PM, Reuven Lax wrote:Jan, The fact that the annotation on the ParDo "changes" the GroupByKey implementation is very specific to the Spark runner implementation. You can imagine another runner that simply writes out files in HDFS to implement a GroupByKey - this GroupByKey implementation is agnostic whether the result will be reiterated or not; in this case it is very much the ParDo implementation that changes to implement a reiterable. vI think you don't like the fact that an annotation on the ParDo will have a non-local effect on the implementation of the GroupByKey upstream. However arguably the non-local effect is just a quirk of how the Spark runner is implemented - other runners might have a local effect. In general I sympathize with the worry about non-local effects. Beam is already full of them (e.g. a Window.into statement effects downstream GroupByKeys). In each case where they were added there was extensive debate and discussion (Windowing semantics were debated for many months), exactly because there was concern over adding these non-local effects. In every case, no other good solution could be found. For the case of windowing for example, it was often easy to propose simple local APIs (e.g. just pass the window fn as a parameter to GroupByKey), however all of these local solutions ended up not working for important use cases when we analyzed them more deeply. As you mentioned below, I do think it's perfectly reasonable for a DSL to impose its own semantics. Scio already does this - the raw Beam API is used by a DSL as a substrate, but the DSL does not need to blindly mirror the semantics of the raw Beam API - at least in my opinion! Reuven On Sat, Sep 28, 2019 at 12:26 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote: I understand the concerns. Still, it looks a little like we want to be able to modify behavior of an object from inside a submodule - quite like if my subprogram would accept a Map interface, but internally I would say "hey, this is supposed to be a HashMap, please change it so". Because of how pipeline is constructed, we can do that, the question is if there really isn't a better solution. What I do not like about the proposed solution: 1) to specify that the grouped elements are supposed to be iterated only once can be done only on ParDo, although there are other (higher level) PTransforms, that can consume output of GBK 2) the annontation on ParDo is by definition generic - i.e. can be used on input which is not output of GBK, which makes no sense 3) we modify the behavior to unlock some optimizations (or change of behavior of the GBK itself), users will not understand that 4) the annotation somewhat arbitrarily modifies data types passed, that is counter-intuitive and will be source of confusion I think that a solution that solves the above problems (but brings somoe new, as always :-)), could be to change the output of GBK from PCollection<K, Iterable<V>> to GroupedPCollection<K, V>. That way we can control which operators (and how) consume the grouping, and we can enable these transforms to specify additional parameters (like how they want to consume the grouping). It is obviously a breaking change (although can be probably made backwards compatible) and it would very much likely involve a substantial work. But maybe there are some other not yet discussed options. Jan On 9/28/19 6:46 AM, Reuven Lax wrote:In many cases, the writer of the ParDo has no access to the GBK (e.g. the GBK is hidden inside an upstream PTransform that they cannot modify). This is the same reason why RequiresStableInput was made a property of the ParDo, because the GroupByKey is quite often inaccessible. The car analogy doesn't quite apply here, because the runner does have a full view of the pipeline so can satisfy all constraints. The car dealership generally cannot read your mind (thankfully!), so you have to specify what you want. Or to put it another way, the various transforms in a Beam pipeline do not live in isolation. The full pipeline graph is what is executed, and the runner already has to analyze the full graph to run the pipeline (as well as to optimize the pipeline). Reuven On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote: I'd suggest Stream instead of Iterator, it has the same semantics and much better API. Still not sure, what is wrong on letting the GBK to decide this. I have an analogy - if I decide to buy a car, I have to decide *what* car I'm going to buy (by think about how I'm going to use it) *before* I buy it. I cannot just buy "a car" and then change it from minivan to sport car based on my actual need. Same with the GBK - if I want to be able to reiterate the result, then I should tell it in advance. Jan On 9/27/19 10:50 PM, Kenneth Knowles wrote:Good point about sibling fusion requiring this. The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already does imply that the output iterable can be iterated arbitrarily many times. I think this should remain the default for all the reasons mentioned. We could have opt-in to the weaker KV<K, Iterator<V>> version. Agree that this is a property of the ParDo. A particular use of a GBK has no idea what is downstream. If you owned the whole pipeline, a special ParDo<Iterator<V>, Foo> would work. But to make the types line up, this would require changes upstream, which is not good. Maybe something like this: ParDo<Iterable<V>, Foo> { @ProcessElement void process(@OneShotIterator Iterator<V> iter) { ... } } I've described all of this in terms of Java SDK. So we would need a portable representation for all this metadata. Kenn On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax <[email protected] <mailto:[email protected]>> wrote: I think the behavior to make explicit is the need to reiterate, not the need to handle large results. How large of a result can be handled will always be dependent on the runner, and each runner will probably have a different definition of large keys. Reiteration however is a logical difference in the programming API. Therefore I think it makes sense to specify the latter. The need to reiterate is a property of the downstream ParDo, so it should be specified there - not on the GBK. Reuven On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote: Ok, I think I understand there might be some benefits of this. Then I'd propose we make this clear on the GBK. If we would support somehing like this: PCollection<?> input = ....; input.apply(GroupByKey.withLargeKeys()); then SparkRunner could expand this to repartitionAndSortWithinPartitions only on this PTransform, and fallback to the default (in memory) in other situations. The default expansion of LargeGroupByKey (let's say) would be classic GBK, so that only runners that need to make sure that they don't break reiterations can expand this. WDYT? Jan On 9/27/19 8:56 PM, Reuven Lax wrote:As I mentioned above, CoGroupByKey already takes advantage of this. Reiterating is not the most common use case, but it's definitely one that comes up. Also keep in mind that this API has supported reiterating for the past five years (since even before the SDK was donated to Apache). Therefore you should assume that users are relying on it, in ways we might not expect. Historically, Google's Flume system had collections that did not support reiterating (they were even called OneShotCollections to make it clear). This was the source of problems and user frustration, which was one reason that in the original Dataflow SDK we made sure that these iterables could be reiterated. Another reason why it's advantageous for a runner to support this is allowing for better fusion. If two ParDos A and B both read from the same GroupByKey, it is nice to be able to fuse them into one logical operator. For this, you'll probably need a shuffle implementation that allows two independent readers from the same shuffle session. How easy it is to implement reiterables that don't have to fit in memory will depend on the runner. For Dataflow it's possible because the shuffle session is logically persistent, so the runner can simply reread the shuffle session. For other runners with different shuffle implementations, it might be harder to support both properties. Maybe we should introduce a new @RequiresReiteration annotation on ParDo? That way the Spark runner can see this and switch to the in-memory version just for groupings consumed by those ParDos. Runners that already support reiteration can ignore this annotation, so it should be backwards compatible. Reuven On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote: I'd like to know the use-case. Why would you *need* to actually iterate the grouped elements twice? By definition the first iteration would have to extract some statistic (or subset of elements that must fit into memory). This statistic can then be used as another input for the second iteration. Why not then calculate the statistic in a separate branch in the pipeline and feed it then into the ParDo as side input? That would be definitely more efficient, because the calculation of the statistic would be probably combinable (not sure if it is absolutely required to be combinable, but it seems probable). Even if the calculation would not be combinable, it is not less efficient than reiterating twice. Why then support multiple iterations (besides the fact that output of GBK is Iterable). Am I missing something? Jan On 9/27/19 6:36 PM, Reuven Lax wrote:This should be an element in the compatibility matrix as well. On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <[email protected] <mailto:[email protected]>> wrote: I am pretty surprised that we do not have a @Category(ValidatesRunner) test in GroupByKeyTest that iterates multiple times. That is a major oversight. We should have this test, and it can be disabled by the SparkRunner's configuration. Kenn On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <[email protected] <mailto:[email protected]>> wrote: The Dataflow version does not spill to disk. However Spark's design might require spilling to disk if you want that to be implemented properly. On Fri, Sep 27, 2019 at 9:08 AM David Morávek <[email protected] <mailto:[email protected]>> wrote: Hi, Spark's GBK is currently implemented using `sortBy(key and value).mapPartition(...)` for non-merging windowing in order to support large keys and large scale shuffles. Merging windowing is implemented using standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping), which is by design unable to support large keys. As Jan noted, problem with mapPartition is, that its UDF receives an Iterator. Only option here is to wrap this iterator to one that spills to disk once an internal buffer is exceeded (the approach suggested by Reuven). This unfortunately comes with a cost in some cases. The best approach would be to somehow determine, that user wants multiple iterations and than wrap it in "re-iterator" if necessary. Does anyone have any ideas how to approach this? D. On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <[email protected] <mailto:[email protected]>> wrote: The Beam API was written to support multiple iterations, and there are definitely transforms that do so. I believe that CoGroupByKey may do this as well with the resulting iterator. I know that the Dataflow runner is able to handles iterators larger than available memory by paging them in from shuffle, which still allows for reiterating. It sounds like Spark is less flexible here? Reuven On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote: +dev <[email protected]> <mailto:[email protected]> Lukasz, why do you think that users expect to be able to iterate multiple times grouped elements? Besides that it obviously suggests the 'Iterable'? The way that spark behaves is pretty much analogous to how MapReduce used to work - in certain cases it calles repartitionAndSortWithinPartitions and then does mapPartition, which accepts Iterator - that is because internally it merge sorts pre sorted segments. This approach enables to GroupByKey data sets that are too big to fit into memory (per key). If multiple iterations should be expected by users, we probably should: a) include that in @ValidatesRunner tests b) store values in memory on spark, which will break for certain pipelines Because of (b) I think that it would be much better to remove this "expectation" and clearly document that the Iterable is not supposed to be iterated multiple times. Jan On 9/27/19 9:27 AM, Jan Lukavský wrote:I pretty much think so, because that is how Spark works. The Iterable inside is really an Iterator, which cannot be iterated multiple times. Jan On 9/27/19 2:00 AM, Lukasz Cwik wrote:Jan, in Beam users expect to be able to iterate the GBK output multiple times even from within the same ParDo. Is this something that Beam on Spark Runner never supported? On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote: Hi Gershi, could you please outline the pipeline you are trying to execute? Basically, you cannot iterate the Iterable multiple times in single ParDo. It should be possible, though, to apply multiple ParDos to output from GroupByKey. Jan On 9/26/19 3:32 PM, Gershi, Noam wrote:Hi, I want to iterate multiple times on the Iterable<V> (the output of GroupByKey transformation) When my Runner is SparkRunner, I get an exception: Caused by: java.lang.IllegalStateException: ValueIterator can't be iterated more than once,otherwise there could be data lost at org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221) at java.lang.Iterable.spliterator(Iterable.java:101) I understood I can branch the pipeline after GroupByKey into multiple transformation and iterate in each of them once on the Iterable<V>. Is there a better way for that? citi_logo_mailciti_logo_mail*Noam Gershi* Software Developer *T*:+972 (3) 7405718 <tel:+972%203-740-5718> Mail_signature_blue
