On Fri, Jul 21, 2017 at 7:31 AM, Jan Lukavský <[email protected]> wrote:
> Hi, > > thanks for answer. I understand that Beam does not want to incorporate in > the model a way to handle parallelism (because it is left to the runner to > decide, which I find good). But there are some use-cases where it would be > beneficial to force *sequential* processing. That is to make sure that > certain PCollection (or, to state it exactly, each window of a PCollection) > is processed entirely by a single (fault tolerant) instance. The terasort > pipeline would then be realizable and I don't think that even affects the > runners so much. Many of them (actuall all I know :)) nevertheless have > this option to process a "partition" by a single "mapper" or "processor". > > Would it be possible to add a sequential form of ParDo into the model? Or > is it strictly against the philosophy? > Here are two answers to these questions: 1) GroupByKey yields an iterable per key and window. So with one key, you have sequential processing per window. In practice, you probably have parallelism only per key. 2) A stateful ParDo performs sequential processing per key and window. So with one key you get again get sequential processing per window, but in this case with still separate elements, so they can span multiple bundles (the unit of commit/flush in Beam). Again in practice, you probably only get parallelism per key. I don't think these really give you an answer to your first query. There also a userland Sorter transform [1] for sorting the iterable that comes from a GBK. Hope this tidbits help in some way, Kenn [1] https://beam.apache.org/documentation/sdks/java-extensions/#sorter Jan > > On 07/19/2017 10:48 PM, Vikas RK wrote: > > The Beam model doesn't support global sorting, [1] discusses in detail > that you might find useful. > > [1] https://lists.apache.org/thread.html/bc0e65a3bb653b8fd0db96bcd4c9da > 5af71a71af5a5639a472167808@1464278191@%3Cdev.beam.apache.org%3E > > On 19 July 2017 at 02:45, Jan Lukavský <[email protected]> wrote: > >> Hi all, >> >> I'm trying to get better understanding of Beam's internals for the sake >> of integration with Euphoria API as a DSL ([1]), and while trying to wrap >> Euphoria's abstractions of outputs, I came across a little issue, that I'm >> currently a little stuck with. The issue is not important to this question, >> but it basically boils down to the following: how could I write a Pipeline, >> that works like a terasort benchmark ([2]). That is - I have a randomly >> distributed dataset (let's suppose batch case for simplicity), and I want >> to sort it so that on output I will have N totally sorted partitions. This >> implies that I can somehow compare the partitions (or partition IDs) on >> output, so that the following holds: For each partitions X and Y, if >> partition X is less to partition Y, then all elements in partition X are >> less or equal to all elements in partition Y. >> >> So far, I have not been able to find a clean solution in Beam. I can do a >> group-by-key operation (where the *key* would be partition Id), and then >> sort the data within the key. But I have issues outputting the sorted data >> by a ParDo (because it can run in parallel in theory, and therefore I can >> either loose the sorting, or run to concurrency issues). >> >> Would anyone have an idea about how to do this? >> >> Thanks for any comments, >> >> Jan >> >> [1] https://github.com/seznam/euphoria >> >> [2] https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/ >> examples/terasort/package-summary.html >> >> > >
