the downstream consumer has these requirements.

Blocking should normally be avoided at all cost, but if the downstream operator has the requirement to only emit a fixed number of messages per second, it should enforce this, i.e. block once the maximum number of messages for a time period have been reached. This will automatically lead to backpressure in Runners like Flink or Dataflow.

-Max

On 07.10.20 18:30, Luke Cwik wrote:
SplittableDoFns apply to both batch and streaming pipelines. They are allowed to produce an unbounded amount of data and can either self checkpoint saying they want to resume later or the runner will ask them to checkpoint via a split call.

There hasn't been anything concrete on backpressure, there has been work done about exposing signals[1] related to IO that a runner can then use intelligently but throttling isn't one of them yet.

1: https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E <https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E>

On Tue, Oct 6, 2020 at 3:51 PM Vincent Marquez <vincent.marq...@gmail.com <mailto:vincent.marq...@gmail.com>> wrote:

    Thanks for the response.  Is my understanding correct that
    SplittableDoFns are only applicable to Batch pipelines?  I'm
    wondering if there's any proposals to address backpressure needs?
    /~Vincent/


    On Tue, Oct 6, 2020 at 1:37 PM Luke Cwik <lc...@google.com
    <mailto:lc...@google.com>> wrote:

        There is no general back pressure mechanism within Apache Beam
        (runners should be intelligent about this but there is currently
        no way to say I'm being throttled so runners don't know that
        throwing more CPUs at a problem won't make it go faster). Y

        You can control how quickly you ingest data for runners that
        support splittable DoFns with SDK initiated checkpoints with
        resume delays. A splittable DoFn is able to return
        resume().withDelay(Duration.seconds(10)) from
        the @ProcessElement method. See Watch[1] for an example.

        The 2.25.0 release enables more splittable DoFn features on more
        runners. I'm working on a blog (initial draft[2], still mostly
        empty) to update the old blog from 2017.

        1:
        
https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908
        
<https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908>
        2:
        
https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#
        
<https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#>


        On Tue, Oct 6, 2020 at 10:39 AM Vincent Marquez
        <vincent.marq...@gmail.com <mailto:vincent.marq...@gmail.com>>
        wrote:

            Hmm, I'm not sure how that will help, I understand how to
            batch up the data, but it is the triggering part that I
            don't see how to do.  For example, in Spark Structured
            Streaming, you can set a time trigger which happens at a
            fixed interval all the way up to the source, so the source
            can throttle how much data to read even.

            Here is my use case more thoroughly explained:

            I have a Kafka topic (with multiple partitions) that I'm
            reading from, and I need to aggregate batches of up to 500
            before sending a single batch off in an RPC call.  However,
            the vendor specified a rate limit, so if there are more than
            500 unread messages in the topic, I must wait 1 second
            before issuing another RPC call. When searching on Stack
            Overflow I found this answer:
            https://stackoverflow.com/a/57275557/25658
            <https://stackoverflow.com/a/57275557/25658> that makes it
            seem challenging, but I wasn't sure if things had changed
            since then or you had better ideas.

            /~Vincent/


            On Thu, Oct 1, 2020 at 2:57 PM Luke Cwik <lc...@google.com
            <mailto:lc...@google.com>> wrote:

                Look at the GroupIntoBatches[1] transform. It will
                buffer "batches" of size X for you.

                1:
                
https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/
                
<https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/>

                On Thu, Oct 1, 2020 at 2:51 PM Vincent Marquez
                <vincent.marq...@gmail.com
                <mailto:vincent.marq...@gmail.com>> wrote:

                    the downstream consumer has these requirements.

                    /~Vincent/


                    On Thu, Oct 1, 2020 at 2:29 PM Luke Cwik
                    <lc...@google.com <mailto:lc...@google.com>> wrote:

                        Why do you want to only emit X? (e.g. running
                        out of memory in the runner)

                        On Thu, Oct 1, 2020 at 2:08 PM Vincent Marquez
                        <vincent.marq...@gmail.com
                        <mailto:vincent.marq...@gmail.com>> wrote:

                            Hello all.  If I want to 'throttle' the
                            number of messages I pull off say, Kafka or
                            some other queue, in order to make sure I
                            only emit X amount per trigger, is there a
                            way to do that and ensure that I get 'at
                            least once' delivery guarantees?   If this
                            isn't supported, would the better way be to
                            pull the limited amount opposed to doing it
                            on the output side?

                            /
                            /
                            /~Vincent/

Reply via email to