If this is the case that the pipeline has no way of enforcing fixed time windows, how does this work:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java#L126 Isn't this supposed to only trigger every five minutes, regardless of how much data can immediately be grouped together in five minute windows? If there is a way to mark that the fixed window should only trigger every so many minutes, that would solve my use case. If there isn't a way to do this, the Kafka offset code seems broken and could result in 'data loss' by improperly committing offsets before they are run through the rest of the pipeline? *~Vincent* On Fri, Oct 16, 2020 at 4:17 AM Maximilian Michels <m...@apache.org> wrote: > > the downstream consumer has these requirements. > > Blocking should normally be avoided at all cost, but if the downstream > operator has the requirement to only emit a fixed number of messages per > second, it should enforce this, i.e. block once the maximum number of > messages for a time period have been reached. This will automatically > lead to backpressure in Runners like Flink or Dataflow. > > -Max > > On 07.10.20 18:30, Luke Cwik wrote: > > SplittableDoFns apply to both batch and streaming pipelines. They are > > allowed to produce an unbounded amount of data and can either self > > checkpoint saying they want to resume later or the runner will ask them > > to checkpoint via a split call. > > > > There hasn't been anything concrete on backpressure, there has been work > > done about exposing signals[1] related to IO that a runner can then use > > intelligently but throttling isn't one of them yet. > > > > 1: > > > https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E > > < > https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E > > > > > > On Tue, Oct 6, 2020 at 3:51 PM Vincent Marquez > > <vincent.marq...@gmail.com <mailto:vincent.marq...@gmail.com>> wrote: > > > > Thanks for the response. Is my understanding correct that > > SplittableDoFns are only applicable to Batch pipelines? I'm > > wondering if there's any proposals to address backpressure needs? > > /~Vincent/ > > > > > > On Tue, Oct 6, 2020 at 1:37 PM Luke Cwik <lc...@google.com > > <mailto:lc...@google.com>> wrote: > > > > There is no general back pressure mechanism within Apache Beam > > (runners should be intelligent about this but there is currently > > no way to say I'm being throttled so runners don't know that > > throwing more CPUs at a problem won't make it go faster). Y > > > > You can control how quickly you ingest data for runners that > > support splittable DoFns with SDK initiated checkpoints with > > resume delays. A splittable DoFn is able to return > > resume().withDelay(Duration.seconds(10)) from > > the @ProcessElement method. See Watch[1] for an example. > > > > The 2.25.0 release enables more splittable DoFn features on more > > runners. I'm working on a blog (initial draft[2], still mostly > > empty) to update the old blog from 2017. > > > > 1: > > > https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908 > > < > https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908 > > > > 2: > > > https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit# > > < > https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit# > > > > > > > > On Tue, Oct 6, 2020 at 10:39 AM Vincent Marquez > > <vincent.marq...@gmail.com <mailto:vincent.marq...@gmail.com>> > > wrote: > > > > Hmm, I'm not sure how that will help, I understand how to > > batch up the data, but it is the triggering part that I > > don't see how to do. For example, in Spark Structured > > Streaming, you can set a time trigger which happens at a > > fixed interval all the way up to the source, so the source > > can throttle how much data to read even. > > > > Here is my use case more thoroughly explained: > > > > I have a Kafka topic (with multiple partitions) that I'm > > reading from, and I need to aggregate batches of up to 500 > > before sending a single batch off in an RPC call. However, > > the vendor specified a rate limit, so if there are more than > > 500 unread messages in the topic, I must wait 1 second > > before issuing another RPC call. When searching on Stack > > Overflow I found this answer: > > https://stackoverflow.com/a/57275557/25658 > > <https://stackoverflow.com/a/57275557/25658> that makes it > > seem challenging, but I wasn't sure if things had changed > > since then or you had better ideas. > > > > /~Vincent/ > > > > > > On Thu, Oct 1, 2020 at 2:57 PM Luke Cwik <lc...@google.com > > <mailto:lc...@google.com>> wrote: > > > > Look at the GroupIntoBatches[1] transform. It will > > buffer "batches" of size X for you. > > > > 1: > > > https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/ > > < > https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/ > > > > > > On Thu, Oct 1, 2020 at 2:51 PM Vincent Marquez > > <vincent.marq...@gmail.com > > <mailto:vincent.marq...@gmail.com>> wrote: > > > > the downstream consumer has these requirements. > > > > /~Vincent/ > > > > > > On Thu, Oct 1, 2020 at 2:29 PM Luke Cwik > > <lc...@google.com <mailto:lc...@google.com>> wrote: > > > > Why do you want to only emit X? (e.g. running > > out of memory in the runner) > > > > On Thu, Oct 1, 2020 at 2:08 PM Vincent Marquez > > <vincent.marq...@gmail.com > > <mailto:vincent.marq...@gmail.com>> wrote: > > > > Hello all. If I want to 'throttle' the > > number of messages I pull off say, Kafka or > > some other queue, in order to make sure I > > only emit X amount per trigger, is there a > > way to do that and ensure that I get 'at > > least once' delivery guarantees? If this > > isn't supported, would the better way be to > > pull the limited amount opposed to doing it > > on the output side? > > > > / > > / > > /~Vincent/ > > >