There are different translations of streaming and batch Pipelines in SparkRunner, this thread was focused on the batch part, if I understand it correctly. Unbounded PCollections are not supported in batch Spark (by definition). I agree that fixing the splitting is a valid option, though it still requires unnecessarily big heap for buffering and/or might induce some overhead with splitting the restriction. Not to mention, that the splitting is somewhat optional in the contract of SDF (the DoFn might not support it, if it is bounded), so it might not solve the issue for all SDFs. The source might not even be splittable at all (e.g. a completely compressed blob, without any blocks).

 Jan

On 1/2/23 16:22, Daniel Collins via dev wrote:
If spark's SDF solution doesn't support splitting, fixing that seems like the best solution to me. Splitting is the mechanism exposed by the model to actually limit the amount of data produced in a bundle. If unsupported, then unbounded-per-element SDFs wouldn't be supported at all.

-Daniel

On Mon, Jan 2, 2023 at 7:46 AM Jan Lukavský <je...@seznam.cz> wrote:

    Hi Jozef,

    I agree that this issue is most likely related to Spark for the
    reason how Spark uses functional style for doing flatMap().

    It could be fixed with the following two options:

     a) SparkRunner's SDF implementation does not use splitting - it
    could be fixed so that the SDF is stopped after N elements
    buffered via trySplit, buffer gets flushed and the restriction is
    resumed

     b) alternatively use two threads and a BlockingQueue between
    them, which is what you propose

    The number of output elements per input element is bounded (we are
    talking about batch case anyway), but bounded does not mean it has
    to fit to memory. Furthermore, unnecessary buffering of large
    number of elements is memory-inefficient, which is why I think
    that the two-thread approach (b) should be the most efficient. The
    option (a) seems orthogonal and might be implemented as well.

    It rises the question of how to determine if the runner should do
    some special translation of SDF in this case. There are probably
    only these options:

     1) translate all SDFs to two-thread execution

     2) add runtime flag, that will turn the translation on (once
    turned on, it will translate all SDFs) - this is the current proposal

     3) extend @DoFn.BoundedPerElement annotation with some kind of
    (optional) hint - e.g.
    @DoFn.BoundedPerElement(Bounded.POSSIBLY_HUGE), the default would
    be Bounded.FITS_IN_MEMORY (which is the current approach)

    The approach (3) seems to give more information to all runners and
    might result in the ability to apply various optimizations for
    multiple runners, so I'd say that this might be the ideal variant.

      Jan

    On 12/29/22 13:07, Jozef Vilcek wrote:
    I am surprised to hear that Dataflow runner ( which I never used
    ) would have this kind oflimitation. I see that the
    `OutputManager` interface is implemented to write to `Receiver`
    [1] which follows the push model. Do you have a reference I can
    take a look to review the must fit memory limitation?

    In Spark, the problem is that the leaf operator pulls data from
    previous ones by consuming an `Iterator` of values. As per your
    suggestion, this is not a problem with `sources` because they
    hold e.g. source file and can pull data as they are being
    requested. This gets problematic exactly with SDF and flatMaps
    and not sources. It could be one of the reasons why SDF performed
    badly on Spark where community reported performance degradation
    [2] and increases memory use [3]

    My proposed solution is to, similar as Dataflow, use
    `Receiver`-like implementation for DoFns which can output large
    number of elements. For now, this WIP targets SDFs only.

    [1]
    
https://github.com/apache/beam/blob/v2.43.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L285
    [2] https://github.com/apache/beam/pull/14755
    [3]
    
https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17332005&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17332005
    
<https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17332005&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17332005>

    On Wed, Dec 28, 2022 at 8:52 PM Daniel Collins via dev
    <dev@beam.apache.org> wrote:

        I believe that for dataflow runner, the result of
        processElement must also fit in memory, so this is not just a
        constraint for the spark runner.

        The best approach at present might be to convert the source
        from a flatMap to an SDF that reads out chunks of the file at
        a time, and supports runner checkpointing (i.e. with a file
        seek point to resume from) to chunk your data in a way that
        doesn't require the runner to support unbounded outputs from
        any individual @ProcessElements downcall.

        -Daniel

        On Wed, Dec 28, 2022 at 1:36 PM Jozef Vilcek
        <jozo.vil...@gmail.com> wrote:

            Hello,

            I am working on an issue which currently limits spark
            runner by requiring the result of processElement to fit
            the memory [1]. This is problematic e.g for flatMap where
            the input element is file split and generates possibly
            large output.

            The intended fix is to add an option to have dofn
            processing over input in one thread and consumption of
            outputs and forwarding them to downstream operators in
            another thread. One challenge for me is to identify which
            DoFn should be using this async approach.

            Here [2] is a commit which is WIP and use async
            processing only for SDF naive expansion. I would like to
            get feedback on:

            1) does the approach make sense overall

            2) to target DoFn which needs an async processing __
            generates possibly large output __ I am currently just
            checking if it is DoFn of SDF naive expansion type [3]. I
            failed to find a better / more systematic approach for
            identifying which DoFn should benefit from that. I would
            appreciate any thoughts how to make this better.

            3) Config option and validatesRunner tests - do we want
            to make it possible to turn async DoFn off? If yes, do we
            want to run validatesRunner tests for borth options? How
            do I make sure of that?
            Looking forward to the feedback.
            Best,
            Jozef

            [1] https://github.com/apache/beam/issues/23852
            [2]
            
https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff
            [3]
            
https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff#diff-bd72087119a098aa8c947d0989083ec9a6f2b54ef18da57d50e0978799c79191R362

Reply via email to